from __future__ import annotations
import copy
import itertools
import logging
import math
import operator
import sys
import warnings
from collections import namedtuple
from collections.abc import Callable, Sequence
from concurrent.futures import ThreadPoolExecutor
from functools import partial, reduce
from itertools import product
from numbers import Integral
from typing import (
TYPE_CHECKING,
Any,
Literal,
TypeAlias,
TypedDict,
TypeVar,
Union,
overload,
)
import numpy as np
import numpy_groupies as npg
import pandas as pd
import toolz as tlz
from scipy.sparse import csc_array, csr_array
from . import xrdtypes
from .aggregate_flox import _prepare_for_flox
from .aggregations import (
AGGREGATIONS,
Aggregation,
AlignedArrays,
Scan,
ScanState,
_atleast_1d,
_initialize_aggregation,
generic_aggregate,
quantile_new_dims_func,
)
from .cache import memoize
from .xrutils import (
is_chunked_array,
is_duck_array,
is_duck_cubed_array,
is_duck_dask_array,
isnull,
module_available,
notnull,
)
if module_available("numpy", minversion="2.0.0"):
from numpy.lib.array_utils import normalize_axis_tuple
else:
from numpy.core.numeric import normalize_axis_tuple # type: ignore[no-redef]
HAS_NUMBAGG = module_available("numbagg", minversion="0.3.0")
if TYPE_CHECKING:
try:
if sys.version_info < (3, 11):
from typing_extensions import Unpack
else:
from typing import Unpack
except (ModuleNotFoundError, ImportError):
Unpack: Any # type: ignore[no-redef]
import cubed.Array as CubedArray
import dask.array.Array as DaskArray
from dask.typing import Graph
T_DuckArray: TypeAlias = np.ndarray | DaskArray | CubedArray # Any ?
T_By: TypeAlias = T_DuckArray
T_Bys = tuple[T_By, ...]
T_ExpectIndex = pd.Index
T_ExpectIndexTuple = tuple[T_ExpectIndex, ...]
T_ExpectIndexOpt = T_ExpectIndex | None
T_ExpectIndexOptTuple = tuple[T_ExpectIndexOpt, ...]
T_Expect = Sequence | np.ndarray | T_ExpectIndex
T_ExpectTuple = tuple[T_Expect, ...]
T_ExpectOpt = Sequence | np.ndarray | T_ExpectIndexOpt
T_ExpectOptTuple = tuple[T_ExpectOpt, ...]
T_ExpectedGroups = T_Expect | T_ExpectOptTuple
T_ExpectedGroupsOpt = T_ExpectedGroups | None
T_Func = str | Callable
T_Funcs = T_Func | Sequence[T_Func]
T_Agg = str | Aggregation
T_Scan = str | Scan
T_Axis = int
T_Axes = tuple[T_Axis, ...]
T_AxesOpt = T_Axis | T_Axes | None
T_Dtypes = np.typing.DTypeLike | Sequence[np.typing.DTypeLike] | None
T_FillValues = np.typing.ArrayLike | Sequence[np.typing.ArrayLike] | None
T_Engine = Literal["flox", "numpy", "numba", "numbagg"]
T_EngineOpt = None | T_Engine
T_Method = Literal["map-reduce", "blockwise", "cohorts"]
T_MethodOpt = None | Literal["map-reduce", "blockwise", "cohorts"]
T_IsBins = bool | Sequence[bool]
T = TypeVar("T")
IntermediateDict = dict[str | Callable, Any]
FinalResultsDict = dict[str, Union["DaskArray", "CubedArray", np.ndarray]]
FactorProps = namedtuple("FactorProps", "offset_group nan_sentinel nanmask")
# This dummy axis is inserted using np.expand_dims
# and then reduced over during the combine stage by
# _simple_combine.
DUMMY_AXIS = -2
logger = logging.getLogger("flox")
class FactorizeKwargs(TypedDict, total=False):
"""Used in _factorize_multiple"""
by: T_Bys
axes: T_Axes
fastpath: bool
expected_groups: T_ExpectIndexOptTuple | None
reindex: bool
sort: bool
def _postprocess_numbagg(result, *, func, fill_value, size, seen_groups):
"""Account for numbagg not providing a fill_value kwarg."""
from .aggregate_numbagg import DEFAULT_FILL_VALUE
if not isinstance(func, str) or func not in DEFAULT_FILL_VALUE:
return result
# The condition needs to be
# len(found_groups) < size; if so we mask with fill_value (?)
default_fv = DEFAULT_FILL_VALUE[func]
needs_masking = fill_value is not None and not np.array_equal(fill_value, default_fv, equal_nan=True)
groups = np.arange(size)
if needs_masking:
mask = np.isin(groups, seen_groups, assume_unique=True, invert=True)
if mask.any():
result[..., groups[mask]] = fill_value
return result
def identity(x: T) -> T:
return x
def _issorted(arr: np.ndarray) -> bool:
return bool((arr[:-1] <= arr[1:]).all())
def _is_arg_reduction(func: T_Agg) -> bool:
if isinstance(func, str) and func in ["argmin", "argmax", "nanargmax", "nanargmin"]:
return True
if isinstance(func, Aggregation) and func.reduction_type == "argreduce":
return True
return False
def _is_minmax_reduction(func: T_Agg) -> bool:
return not _is_arg_reduction(func) and (isinstance(func, str) and ("max" in func or "min" in func))
def _is_first_last_reduction(func: T_Agg) -> bool:
if isinstance(func, Aggregation):
func = func.name
return func in ["nanfirst", "nanlast", "first", "last"]
def _get_expected_groups(by: T_By, sort: bool) -> T_ExpectIndex:
if is_duck_dask_array(by):
raise ValueError("Please provide expected_groups if not grouping by a numpy array.")
flatby = by.reshape(-1)
expected = pd.unique(flatby[notnull(flatby)])
return _convert_expected_groups_to_index((expected,), isbin=(False,), sort=sort)[0]
def _get_chunk_reduction(reduction_type: Literal["reduce", "argreduce"]) -> Callable:
if reduction_type == "reduce":
return chunk_reduce
elif reduction_type == "argreduce":
return chunk_argreduce
else:
raise ValueError(f"Unknown reduction type: {reduction_type}")
def is_nanlen(reduction: T_Func) -> bool:
return isinstance(reduction, str) and reduction == "nanlen"
def _move_reduce_dims_to_end(arr: np.ndarray, axis: T_Axes) -> np.ndarray:
"""Transpose `arr` by moving `axis` to the end."""
axis = tuple(axis)
order = tuple(ax for ax in np.arange(arr.ndim) if ax not in axis) + axis
arr = arr.transpose(order)
return arr
def _collapse_axis(arr: np.ndarray, naxis: int) -> np.ndarray:
"""Reshape so that the last `naxis` axes are collapsed to one axis."""
newshape = arr.shape[:-naxis] + (math.prod(arr.shape[-naxis:]),)
return arr.reshape(newshape)
@memoize
def _get_optimal_chunks_for_groups(chunks, labels):
chunkidx = np.cumsum(chunks) - 1
# what are the groups at chunk boundaries
labels_at_chunk_bounds = _unique(labels[chunkidx])
# what's the last index of all groups
last_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="last")
# what's the last index of groups at the chunk boundaries.
lastidx = last_indexes[labels_at_chunk_bounds]
if len(chunkidx) == len(lastidx) and (chunkidx == lastidx).all():
return chunks
first_indexes = npg.aggregate_numpy.aggregate(labels, np.arange(len(labels)), func="first")
firstidx = first_indexes[labels_at_chunk_bounds]
newchunkidx = [0]
for c, f, l in zip(chunkidx, firstidx, lastidx): # noqa
Δf = abs(c - f)
Δl = abs(c - l)
if c == 0 or newchunkidx[-1] > l:
continue
if Δf < Δl and f > newchunkidx[-1]:
newchunkidx.append(f)
else:
newchunkidx.append(l + 1)
if newchunkidx[-1] != chunkidx[-1] + 1:
newchunkidx.append(chunkidx[-1] + 1)
newchunks = np.diff(newchunkidx)
assert sum(newchunks) == sum(chunks)
return tuple(newchunks)
def _unique(a: np.ndarray) -> np.ndarray:
"""Much faster to use pandas unique and sort the results.
np.unique sorts before uniquifying and is slow."""
return np.sort(pd.unique(a.reshape(-1)))
def slices_from_chunks(chunks):
"""slightly modified from dask.array.core.slices_from_chunks to be lazy"""
cumdims = [tlz.accumulate(operator.add, bds, 0) for bds in chunks]
slices = (
(slice(s, s + dim) for s, dim in zip(starts, shapes)) for starts, shapes in zip(cumdims, chunks)
)
return product(*slices)
def _compute_label_chunk_bitmask(labels, chunks, nlabels):
def make_bitmask(rows, cols):
data = np.broadcast_to(np.array(1, dtype=np.uint8), rows.shape)
return csc_array((data, (rows, cols)), dtype=bool, shape=(nchunks, nlabels))
assert isinstance(labels, np.ndarray)
shape = tuple(sum(c) for c in chunks)
nchunks = math.prod(len(c) for c in chunks)
approx_chunk_size = math.prod(c[0] for c in chunks)
# Shortcut for 1D with size-1 chunks
if shape == (nchunks,):
rows_array = np.arange(nchunks)
cols_array = labels
mask = labels >= 0
return make_bitmask(rows_array[mask], cols_array[mask])
labels = np.broadcast_to(labels, shape[-labels.ndim :])
cols = []
ilabels = np.arange(nlabels)
def chunk_unique(labels, slicer, nlabels, label_is_present=None):
if label_is_present is None:
label_is_present = np.empty((nlabels + 1,), dtype=bool)
label_is_present[:] = False
subset = labels[slicer]
# This is a quite fast way to find unique integers, when we know how many there are
# inspired by a similar idea in numpy_groupies for first, last
# instead of explicitly finding uniques, repeatedly write True to the same location
label_is_present[subset.reshape(-1)] = True
# skip the -1 sentinel by slicing
# Faster than np.argwhere by a lot
uniques = ilabels[label_is_present[:-1]]
return uniques
# TODO: refine this heuristic.
# The general idea is that with the threadpool, we repeatedly allocate memory
# for `label_is_present`. We trade that off against the parallelism across number of chunks.
# For large enough number of chunks (relative to number of labels), it makes sense to
# suffer the extra allocation in exchange for parallelism.
THRESHOLD = 2
if nlabels < THRESHOLD * approx_chunk_size:
logger.debug(
"Using threadpool since num_labels %s < %d * chunksize %s",
nlabels,
THRESHOLD,
approx_chunk_size,
)
with ThreadPoolExecutor() as executor:
futures = [
executor.submit(chunk_unique, labels, slicer, nlabels)
for slicer in slices_from_chunks(chunks)
]
cols = tuple(f.result() for f in futures)
else:
logger.debug(
"Using serial loop since num_labels %s > %d * chunksize %s",
nlabels,
THRESHOLD,
approx_chunk_size,
)
cols = []
# Add one to handle the -1 sentinel value
label_is_present = np.empty((nlabels + 1,), dtype=bool)
for region in slices_from_chunks(chunks):
uniques = chunk_unique(labels, region, nlabels, label_is_present)
cols.append(uniques)
rows_array = np.repeat(np.arange(nchunks), tuple(len(col) for col in cols))
cols_array = np.concatenate(cols)
return make_bitmask(rows_array, cols_array)
# @memoize
def find_group_cohorts(
labels, chunks, expected_groups: None | pd.RangeIndex = None, merge: bool = False
) -> tuple[T_Method, dict]:
"""
Finds groups labels that occur together aka "cohorts"
If available, results are cached in a 1MB cache managed by `cachey`.
This allows us to be quick when repeatedly calling groupby_reduce
for arrays with the same chunking (e.g. an xarray Dataset).
Parameters
----------
labels : np.ndarray
mD Array of integer group codes, factorized so that -1
represents NaNs.
chunks : tuple
chunks of the array being reduced
expected_groups: pd.RangeIndex (optional)
Used to extract the largest label expected
merge: bool (optional)
Whether to merge cohorts or not. Set to True if a user
specifies "cohorts" but other methods are preferable.
Returns
-------
preferred_method: {"blockwise", cohorts", "map-reduce"}
cohorts: dict_values
Iterable of cohorts
"""
# To do this, we must have values in memory so casting to numpy should be safe
labels = np.asarray(labels)
shape = tuple(sum(c) for c in chunks)
nchunks = math.prod(len(c) for c in chunks)
# assumes that `labels` are factorized
if expected_groups is None:
nlabels = labels.max() + 1
else:
nlabels = expected_groups[-1] + 1
# 1. Single chunk, blockwise always
if nchunks == 1:
return "blockwise", {(0,): list(range(nlabels))}
labels = np.broadcast_to(labels, shape[-labels.ndim :])
bitmask = _compute_label_chunk_bitmask(labels, chunks, nlabels)
CHUNK_AXIS, LABEL_AXIS = 0, 1
chunks_per_label = bitmask.sum(axis=CHUNK_AXIS)
# can happen when `expected_groups` is passed but not all labels are present
# (binning, resampling)
present_labels = np.arange(bitmask.shape[LABEL_AXIS])
present_labels_mask = chunks_per_label != 0
if not present_labels_mask.all():
present_labels = present_labels[present_labels_mask]
bitmask = bitmask[..., present_labels_mask]
chunks_per_label = chunks_per_label[present_labels_mask]
label_chunks = {
present_labels[idx].item(): bitmask.indices[slice(bitmask.indptr[idx], bitmask.indptr[idx + 1])]
for idx in range(bitmask.shape[LABEL_AXIS])
}
# Invert the label_chunks mapping so we know which labels occur together.
def invert(x) -> tuple[np.ndarray, ...]:
arr = label_chunks[x]
return tuple(arr.tolist())
chunks_cohorts = tlz.groupby(invert, label_chunks.keys())
# 2. Every group is contained to one block, use blockwise here.
if bitmask.shape[CHUNK_AXIS] == 1 or (chunks_per_label == 1).all():
logger.debug("find_group_cohorts: blockwise is preferred.")
return "blockwise", chunks_cohorts
# 3. Perfectly chunked so there is only a single cohort
if len(chunks_cohorts) == 1:
logger.debug("Only found a single cohort. 'map-reduce' is preferred.")
return "map-reduce", chunks_cohorts if merge else {}
# 4. Our dataset has chunksize one along the axis,
single_chunks = all(all(a == 1 for a in ac) for ac in chunks)
# 5. Every chunk only has a single group, but that group might extend across multiple chunks
one_group_per_chunk = (bitmask.sum(axis=LABEL_AXIS) == 1).all()
# 6. Existing cohorts don't overlap, great for time grouping with perfect chunking
no_overlapping_cohorts = (np.bincount(np.concatenate(tuple(chunks_cohorts.keys()))) == 1).all()
if one_group_per_chunk or single_chunks or no_overlapping_cohorts:
logger.debug("find_group_cohorts: cohorts is preferred, chunking is perfect.")
return "cohorts", chunks_cohorts
# We'll use containment to measure degree of overlap between labels.
# Containment C = |Q & S| / |Q|
# - |X| is the cardinality of set X
# - Q is the query set being tested
# - S is the existing set
# The bitmask matrix S allows us to calculate this pretty efficiently using a dot product.
# S.T @ S / chunks_per_label
#
# We treat the sparsity(C) = (nnz/size) as a summary measure of the net overlap.
# 1. For high enough sparsity, there is a lot of overlap and we should use "map-reduce".
# 2. When labels are uniformly distributed amongst all chunks
# (and number of labels < chunk size), sparsity is 1.
# 3. Time grouping cohorts (e.g. dayofyear) appear as lines in this matrix.
# 4. When there are no overlaps at all between labels, containment is a block diagonal matrix
# (approximately).
#
# However computing S.T @ S can still be the slowest step, especially if S
# is not particularly sparse. Empirically the sparsity( S.T @ S ) > min(1, 2 x sparsity(S)).
# So we use sparsity(S) as a shortcut.
MAX_SPARSITY_FOR_COHORTS = 0.4 # arbitrary
sparsity = bitmask.nnz / math.prod(bitmask.shape)
preferred_method: Literal["map-reduce"] | Literal["cohorts"]
logger.debug(
"sparsity of bitmask is {}, threshold is {}".format( # noqa
sparsity, MAX_SPARSITY_FOR_COHORTS
)
)
# 7. Groups seem fairly randomly distributed, use "map-reduce".
if sparsity > MAX_SPARSITY_FOR_COHORTS:
if not merge:
logger.debug(
"find_group_cohorts: bitmask sparsity={}, merge=False, choosing 'map-reduce'".format( # noqa
sparsity
)
)
return "map-reduce", {}
preferred_method = "map-reduce"
else:
preferred_method = "cohorts"
# Note: While A.T @ A is a symmetric matrix, the division by chunks_per_label
# makes it non-symmetric.
asfloat = bitmask.astype(float)
containment = csr_array(asfloat.T @ asfloat / chunks_per_label)
logger.debug(
"sparsity of containment matrix is {}".format( # noqa
containment.nnz / math.prod(containment.shape)
)
)
# Next we for-loop over groups and merge those that are quite similar.
# Use a threshold on containment to always force some merging.
# Note that we do not use the filtered containment matrix for estimating "sparsity"
# because it is a bit hard to reason about.
MIN_CONTAINMENT = 0.75 # arbitrary
mask = containment.data < MIN_CONTAINMENT
# Now we also know "exact cohorts" -- cohorts whose constituent groups
# occur in exactly the same chunks. We only need examine one member of each group.
# Skip the others by first looping over the exact cohorts, and zero out those rows.
repeated = np.concatenate([v[1:] for v in chunks_cohorts.values()]).astype(int)
repeated_idx = np.searchsorted(present_labels, repeated)
for i in repeated_idx:
mask[containment.indptr[i] : containment.indptr[i + 1]] = True
containment.data[mask] = 0
containment.eliminate_zeros()
# Figure out all the labels we need to loop over later
n_overlapping_labels = containment.astype(bool).sum(axis=1)
order = np.argsort(n_overlapping_labels, kind="stable")[::-1]
# Order is such that we iterate over labels, beginning with those with most overlaps
# Also filter out any "exact" cohorts
order = order[n_overlapping_labels[order] > 0]
logger.debug("find_group_cohorts: merging cohorts")
merged_cohorts = {}
merged_keys = set()
for rowidx in order:
if present_labels[rowidx] in merged_keys:
continue
cohidx = containment.indices[slice(containment.indptr[rowidx], containment.indptr[rowidx + 1])]
cohort_ = present_labels[cohidx]
cohort = [elem.item() for elem in cohort_ if elem not in merged_keys]
if not cohort:
continue
merged_keys.update(cohort)
allchunks = (label_chunks[member].tolist() for member in cohort)
chunk = tuple(set(itertools.chain(*allchunks)))
merged_cohorts[chunk] = cohort
actual_ngroups = np.concatenate(tuple(merged_cohorts.values())).size
expected_ngroups = present_labels.size
assert len(merged_keys) == actual_ngroups
assert expected_ngroups == actual_ngroups, (expected_ngroups, actual_ngroups)
# sort by first label in cohort
# This will help when sort=True (default)
# and we have to resort the dask array
as_sorted = dict(sorted(merged_cohorts.items(), key=lambda kv: kv[1][0]))
return preferred_method, as_sorted
[docs]
def rechunk_for_cohorts(
array: DaskArray,
axis: T_Axis,
labels: np.ndarray,
force_new_chunk_at: Sequence,
chunksize: int | None = None,
ignore_old_chunks: bool = False,
debug: bool = False,
) -> DaskArray:
"""
Rechunks array so that each new chunk contains groups that always occur together.
Parameters
----------
array : dask.array.Array
array to rechunk
axis : int
Axis to rechunk
labels : np.ndarray
1D Group labels to align chunks with. This routine works
well when ``labels`` has repeating patterns: e.g.
``1, 2, 3, 1, 2, 3, 4, 1, 2, 3`` though there is no requirement
that the pattern must contain sequences.
force_new_chunk_at : Sequence
Labels at which we always start a new chunk. For
the example ``labels`` array, this would be `1`.
chunksize : int, optional
nominal chunk size. Chunk size is exceeded when the label
in ``force_new_chunk_at`` is less than ``chunksize//2`` elements away.
If None, uses median chunksize along axis.
Returns
-------
dask.array.Array
rechunked array
"""
if chunksize is None:
chunksize = np.median(array.chunks[axis]).astype(int)
if len(labels) != array.shape[axis]:
raise ValueError(
"labels must be equal to array.shape[axis]. "
f"Received length {len(labels)}. Expected length {array.shape[axis]}"
)
force_new_chunk_at = _atleast_1d(force_new_chunk_at)
oldchunks = array.chunks[axis]
oldbreaks = np.insert(np.cumsum(oldchunks), 0, 0)
if debug:
labels_at_breaks = labels[oldbreaks[:-1]]
print(labels_at_breaks[:40])
isbreak = np.isin(labels, force_new_chunk_at)
if not np.any(isbreak):
raise ValueError("One or more labels in ``force_new_chunk_at`` not present in ``labels``.")
divisions = []
counter = 1
for idx, lab in enumerate(labels):
if lab in force_new_chunk_at or idx == 0:
divisions.append(idx)
counter = 1
continue
next_break = np.nonzero(isbreak[idx:])[0]
if next_break.any():
next_break_is_close = next_break[0] <= chunksize // 2
else:
next_break_is_close = False
if (not ignore_old_chunks and idx in oldbreaks) or (counter >= chunksize and not next_break_is_close):
divisions.append(idx)
counter = 1
continue
counter += 1
divisions.append(len(labels))
if debug:
labels_at_breaks = labels[divisions[:-1]]
print(labels_at_breaks[:40])
newchunks = tuple(np.diff(divisions))
if debug:
print(divisions[:10], newchunks[:10])
print(divisions[-10:], newchunks[-10:])
assert sum(newchunks) == len(labels)
if newchunks == array.chunks[axis]:
return array
else:
return array.rechunk({axis: newchunks})
[docs]
def rechunk_for_blockwise(array: DaskArray, axis: T_Axis, labels: np.ndarray) -> DaskArray:
"""
Rechunks array so that group boundaries line up with chunk boundaries, allowing
embarrassingly parallel group reductions.
This only works when the groups are sequential
(e.g. labels = ``[0,0,0,1,1,1,1,2,2]``).
Such patterns occur when using ``.resample``.
Parameters
----------
array : DaskArray
Array to rechunk
axis : int
Axis along which to rechunk the array.
labels : np.ndarray
Group labels
Returns
-------
DaskArray
Rechunked array
"""
# TODO: this should be unnecessary?
labels = factorize_((labels,), axes=())[0]
chunks = array.chunks[axis]
newchunks = _get_optimal_chunks_for_groups(chunks, labels)
if newchunks == chunks:
return array
else:
return array.rechunk({axis: newchunks})
def reindex_(
array: np.ndarray,
from_,
to,
fill_value: Any = None,
axis: T_Axis = -1,
promote: bool = False,
) -> np.ndarray:
if not isinstance(to, pd.Index):
if promote:
to = pd.Index(to)
else:
raise ValueError("reindex requires a pandas.Index or promote=True")
if to.ndim > 1:
raise ValueError(f"Cannot reindex to a multidimensional array: {to}")
if array.shape[axis] == 0:
# all groups were NaN
reindexed = np.full(array.shape[:-1] + (len(to),), fill_value, dtype=array.dtype)
return reindexed
from_ = pd.Index(from_)
# short-circuit for trivial case
if from_.equals(to):
return array
if from_.dtype.kind == "O" and isinstance(from_[0], tuple):
raise NotImplementedError(
"Currently does not support reindexing with object arrays of tuples. "
"These occur when grouping by multi-indexed variables in xarray."
)
idx = from_.get_indexer(to)
indexer = [slice(None, None)] * array.ndim
indexer[axis] = idx
reindexed = array[tuple(indexer)]
if any(idx == -1):
if fill_value is None:
raise ValueError("Filling is required. fill_value cannot be None.")
indexer[axis] = idx == -1
# This allows us to match xarray's type promotion rules
if fill_value is xrdtypes.NA or isnull(fill_value):
new_dtype, fill_value = xrdtypes.maybe_promote(reindexed.dtype)
reindexed = reindexed.astype(new_dtype, copy=False)
reindexed[tuple(indexer)] = fill_value
return reindexed
def offset_labels(labels: np.ndarray, ngroups: int) -> tuple[np.ndarray, int]:
"""
Offset group labels by dimension. This is used when we
reduce over a subset of the dimensions of by. It assumes that the reductions
dimensions have been flattened in the last dimension
Copied from xhistogram &
https://stackoverflow.com/questions/46256279/bin-elements-per-row-vectorized-2d-bincount-for-numpy
"""
assert labels.ndim > 1
offset: np.ndarray = (
labels + np.arange(math.prod(labels.shape[:-1])).reshape((*labels.shape[:-1], -1)) * ngroups
)
# -1 indicates NaNs. preserve these otherwise we aggregate in the wrong groups!
offset[labels == -1] = -1
size: int = math.prod(labels.shape[:-1]) * ngroups
return offset, size
@overload
def factorize_(
by: T_Bys,
axes: T_Axes,
*,
fastpath: Literal[True],
expected_groups: T_ExpectIndexOptTuple | None = None,
reindex: bool = False,
sort: bool = True,
) -> tuple[np.ndarray, tuple[np.ndarray, ...], tuple[int, ...], int, int, None]: ...
@overload
def factorize_(
by: T_Bys,
axes: T_Axes,
*,
expected_groups: T_ExpectIndexOptTuple | None = None,
reindex: bool = False,
sort: bool = True,
fastpath: Literal[False] = False,
) -> tuple[np.ndarray, tuple[np.ndarray, ...], tuple[int, ...], int, int, FactorProps]: ...
@overload
def factorize_(
by: T_Bys,
axes: T_Axes,
*,
expected_groups: T_ExpectIndexOptTuple | None = None,
reindex: bool = False,
sort: bool = True,
fastpath: bool = False,
) -> tuple[np.ndarray, tuple[np.ndarray, ...], tuple[int, ...], int, int, FactorProps | None]: ...
def factorize_(
by: T_Bys,
axes: T_Axes,
*,
expected_groups: T_ExpectIndexOptTuple | None = None,
reindex: bool = False,
sort: bool = True,
fastpath: bool = False,
) -> tuple[np.ndarray, tuple[np.ndarray, ...], tuple[int, ...], int, int, FactorProps | None]:
"""
Returns an array of integer codes for groups (and associated data)
by wrapping pd.cut and pd.factorize (depending on isbin).
This method handles reindex and sort so that we don't spend time reindexing / sorting
a possibly large results array. Instead we set up the appropriate integer codes (group_idx)
so that the results come out in the appropriate order.
"""
if expected_groups is None:
expected_groups = (None,) * len(by)
factorized = []
found_groups = []
for groupvar, expect in zip(by, expected_groups):
flat = groupvar.reshape(-1)
if isinstance(expect, pd.RangeIndex):
# idx is a view of the original `by` array
# copy here so we don't have a race condition with the
# group_idx[nanmask] = nan_sentinel assignment later
# this is important in shared-memory parallelism with dask
# TODO: figure out how to avoid this
idx = flat.copy()
found_groups.append(np.array(expect))
# TODO: fix by using masked integers
idx[idx > expect[-1]] = -1
elif isinstance(expect, pd.IntervalIndex):
if expect.closed == "both":
raise NotImplementedError
bins = np.concatenate([expect.left.to_numpy(), expect.right.to_numpy()[[-1]]])
# digitize is 0 or idx.max() for values outside the bounds of all intervals
# make it behave like pd.cut which uses -1:
if len(bins) > 1:
right = expect.closed_right
idx = np.digitize(
flat,
bins=bins.view(np.int64) if bins.dtype.kind == "M" else bins,
right=right,
)
idx -= 1
within_bins = flat <= bins.max() if right else flat < bins.max()
idx[~within_bins] = -1
else:
idx = np.zeros_like(flat, dtype=np.intp) - 1
found_groups.append(np.array(expect))
else:
if expect is not None and reindex:
sorter = np.argsort(expect)
groups = expect[(sorter,)] if sort else expect
idx = np.searchsorted(expect, flat, sorter=sorter)
mask = ~np.isin(flat, expect) | isnull(flat) | (idx == len(expect))
if not sort:
# idx is the index in to the sorted array.
# if we didn't want sorting, unsort it back
idx[(idx == len(expect),)] = -1
idx = sorter[(idx,)]
idx[mask] = -1
else:
idx, groups = pd.factorize(flat, sort=sort)
found_groups.append(np.array(groups))
factorized.append(idx.reshape(groupvar.shape))
grp_shape = tuple(len(grp) for grp in found_groups)
ngroups = math.prod(grp_shape)
if len(by) > 1:
group_idx = np.ravel_multi_index(factorized, grp_shape, mode="wrap")
# NaNs; as well as values outside the bins are coded by -1
# Restore these after the raveling
nan_by_mask = reduce(np.logical_or, [(f == -1) for f in factorized])
group_idx[nan_by_mask] = -1
else:
group_idx = factorized[0]
if fastpath:
return group_idx, tuple(found_groups), grp_shape, ngroups, ngroups, None
if len(axes) == 1 and groupvar.ndim > 1:
# Not reducing along all dimensions of by
# this is OK because for 3D by and axis=(1,2),
# we collapse to a 2D by and axis=-1
offset_group = True
group_idx, size = offset_labels(group_idx.reshape(by[0].shape), ngroups)
else:
size = ngroups
offset_group = False
# numpy_groupies cannot deal with group_idx = -1
# so we'll add use ngroups as the sentinel
# note we cannot simply remove the NaN locations;
# that would mess up argmax, argmin
nan_sentinel = size if offset_group else ngroups
nanmask = group_idx == -1
if nanmask.any():
# bump it up so there's a place to assign values to the nan_sentinel index
size += 1
group_idx[nanmask] = nan_sentinel
props = FactorProps(offset_group, nan_sentinel, nanmask)
return group_idx, tuple(found_groups), grp_shape, ngroups, size, props
def chunk_argreduce(
array_plus_idx: tuple[np.ndarray, ...],
by: np.ndarray,
func: T_Funcs,
expected_groups: pd.Index | None,
axis: T_AxesOpt,
fill_value: T_FillValues,
dtype: T_Dtypes = None,
reindex: bool = False,
engine: T_Engine = "numpy",
sort: bool = True,
user_dtype=None,
) -> IntermediateDict:
"""
Per-chunk arg reduction.
Expects a tuple of (array, index along reduction axis). Inspired by
dask.array.reductions.argtopk
"""
array, idx = array_plus_idx
by = np.broadcast_to(by, array.shape)
results = chunk_reduce(
array,
by,
func,
expected_groups=None,
axis=axis,
fill_value=fill_value,
dtype=dtype,
engine=engine,
sort=sort,
user_dtype=user_dtype,
)
if not isnull(results["groups"]).all():
idx = np.broadcast_to(idx, array.shape)
# array, by get flattened to 1D before passing to npg
# so the indexes need to be unraveled
newidx = np.unravel_index(results["intermediates"][1], array.shape)
# Now index into the actual "global" indexes `idx`
results["intermediates"][1] = idx[newidx]
if reindex and expected_groups is not None:
results["intermediates"][1] = reindex_(
results["intermediates"][1],
results["groups"].squeeze(),
expected_groups,
fill_value=0,
)
assert results["intermediates"][0].shape == results["intermediates"][1].shape
return results
def chunk_reduce(
array: np.ndarray,
by: np.ndarray,
func: T_Funcs,
expected_groups: pd.Index | None,
axis: T_AxesOpt = None,
fill_value: T_FillValues = None,
dtype: T_Dtypes = None,
reindex: bool = False,
engine: T_Engine = "numpy",
kwargs: Sequence[dict] | None = None,
sort: bool = True,
user_dtype=None,
) -> IntermediateDict:
"""
Wrapper for numpy_groupies aggregate that supports nD ``array`` and
mD ``by``.
Core groupby reduction using numpy_groupies. Uses ``pandas.factorize`` to factorize
``by``. Offsets the groups if not reducing along all dimensions of ``by``.
Always ravels ``by`` to 1D, flattens appropriate dimensions of array.
When dask arrays are passed to groupby_reduce, this function is called on every
block.
Parameters
----------
array : numpy.ndarray
Array of values to reduced
by : numpy.ndarray
Array to group by.
func : str or Callable or Sequence[str] or Sequence[Callable]
Name of reduction or function, passed to numpy_groupies.
Supports multiple reductions.
axis : (optional) int or Sequence[int]
If None, reduce along all dimensions of array.
Else reduce along specified axes.
Returns
-------
dict
"""
funcs = _atleast_1d(func)
nfuncs = len(funcs)
dtypes = _atleast_1d(dtype, nfuncs)
fill_values = _atleast_1d(fill_value, nfuncs)
kwargss = _atleast_1d({}, nfuncs) if kwargs is None else kwargs
if isinstance(axis, Sequence):
axes: T_Axes = axis
nax = len(axes)
else:
nax = by.ndim
axes = () if axis is None else (axis,) * nax
assert by.ndim <= array.ndim
final_array_shape = array.shape[:-nax] + (1,) * (nax - 1)
final_groups_shape = (1,) * (nax - 1)
if 1 < nax < by.ndim:
# when axis is a tuple
# collapse and move reduction dimensions to the end
by = _collapse_axis(by, nax)
array = _collapse_axis(array, nax)
axes = (-1,)
nax = 1
# if indices=[2,2,2], npg assumes groups are (0, 1, 2);
# and will return a result that is bigger than necessary
# avoid by factorizing again so indices=[2,2,2] is changed to
# indices=[0,0,0]. This is necessary when combining block results
# factorize can handle strings etc unlike digitize
group_idx, grps, found_groups_shape, _, size, props = factorize_(
(by,), axes, expected_groups=(expected_groups,), reindex=reindex, sort=sort
)
(groups,) = grps
# do this *before* possible broadcasting below.
# factorize_ has already taken care of offsetting
if engine == "numbagg":
seen_groups = _unique(group_idx)
order = "C"
if nax > 1:
needs_broadcast = any(
group_idx.shape[ax] != array.shape[ax] and group_idx.shape[ax] == 1 for ax in range(-nax, 0)
)
if needs_broadcast:
# This is the dim=... case, it's a lot faster to ravel group_idx
# in fortran order since group_idx is then sorted
# I'm seeing 400ms -> 23ms for engine="flox"
# Of course we are slower to ravel `array` but we avoid argsorting
# both `array` *and* `group_idx` in _prepare_for_flox
group_idx = np.broadcast_to(group_idx, array.shape[-by.ndim :])
if engine == "flox":
group_idx = group_idx.reshape(-1, order="F")
order = "F"
# always reshape to 1D along group dimensions
newshape = array.shape[: array.ndim - by.ndim] + (math.prod(array.shape[-by.ndim :]),)
array = array.reshape(newshape, order=order) # type: ignore[call-overload]
group_idx = group_idx.reshape(-1)
assert group_idx.ndim == 1
empty = np.all(props.nanmask)
hasnan = np.any(props.nanmask)
results: IntermediateDict = {"groups": [], "intermediates": []}
if reindex and expected_groups is not None:
# TODO: what happens with binning here?
results["groups"] = expected_groups.to_numpy()
else:
if empty:
results["groups"] = np.array([np.nan])
else:
results["groups"] = groups
# npg's argmax ensures that index of first "max" is returned assuming there
# are many elements equal to the "max". Sorting messes this up totally.
# so we skip this for argreductions
if engine == "flox":
# is_arg_reduction = any("arg" in f for f in func if isinstance(f, str))
# if not is_arg_reduction:
group_idx, array, _ = _prepare_for_flox(group_idx, array)
final_array_shape += results["groups"].shape
final_groups_shape += results["groups"].shape
# we commonly have func=(..., "nanlen", "nanlen") when
# counts are needed for the final result as well as for masking
# optimize that out.
previous_reduction: T_Func = ""
for reduction, fv, kw, dt in zip(funcs, fill_values, kwargss, dtypes):
if empty:
result = np.full(shape=final_array_shape, fill_value=fv)
elif is_nanlen(reduction) and is_nanlen(previous_reduction):
result = results["intermediates"][-1]
else:
# fill_value here is necessary when reducing with "offset" groups
kw_func = dict(size=size, dtype=dt, fill_value=fv)
kw_func.update(kw)
if callable(reduction):
# passing a custom reduction for npg to apply per-group is really slow!
# So this `reduction` has to do the groupby-aggregation
result = reduction(group_idx, array, **kw_func)
else:
result = generic_aggregate(
group_idx, array, axis=-1, engine=engine, func=reduction, **kw_func
).astype(dt, copy=False)
if engine == "numbagg":
result = _postprocess_numbagg(
result,
func=reduction,
size=size,
fill_value=fv,
# Unfortunately, we cannot reuse found_groups, it has not
# been "offset" and is really expected_groups in nearly all cases
seen_groups=seen_groups,
)
if hasnan:
# remove NaN group label which should be last
result = result[..., :-1]
# TODO: Figure out how to generalize this
if reduction in ("quantile", "nanquantile"):
new_dims_shape = tuple(dim.size for dim in quantile_new_dims_func(**kw) if not dim.is_scalar)
else:
new_dims_shape = tuple()
result = result.reshape(new_dims_shape + final_array_shape[:-1] + found_groups_shape)
results["intermediates"].append(result)
previous_reduction = reduction
results["groups"] = np.broadcast_to(results["groups"], final_groups_shape)
return results
def _squeeze_results(results: IntermediateDict, axis: T_Axes) -> IntermediateDict:
# at the end we squeeze out extra dims
groups = results["groups"]
newresults: IntermediateDict = {"groups": [], "intermediates": []}
newresults["groups"] = np.squeeze(
groups, axis=tuple(ax for ax in range(groups.ndim - 1) if groups.shape[ax] == 1)
)
for v in results["intermediates"]:
squeeze_ax = tuple(ax for ax in sorted(axis)[:-1] if v.shape[ax] == 1)
newresults["intermediates"].append(np.squeeze(v, axis=squeeze_ax) if squeeze_ax else v)
return newresults
def _finalize_results(
results: IntermediateDict,
agg: Aggregation,
axis: T_Axes,
expected_groups: pd.Index | None,
fill_value: Any,
reindex: bool,
) -> FinalResultsDict:
"""Finalize results by
1. Squeezing out dummy dimensions
2. Calling agg.finalize with intermediate results
3. Mask using counts and fill with user-provided fill_value.
4. reindex to expected_groups
"""
squeezed = _squeeze_results(results, tuple(agg.num_new_vector_dims + ax for ax in axis))
min_count = agg.min_count
if min_count > 0:
counts = squeezed["intermediates"][-1]
squeezed["intermediates"] = squeezed["intermediates"][:-1]
# finalize step
finalized: FinalResultsDict = {}
if agg.finalize is None:
finalized[agg.name] = squeezed["intermediates"][0]
else:
finalized[agg.name] = agg.finalize(*squeezed["intermediates"], **agg.finalize_kwargs)
if min_count > 0:
count_mask = counts < min_count
if count_mask.any():
# For one count_mask.any() prevents promoting bool to dtype(fill_value) unless
# necessary
if fill_value is None:
raise ValueError("Filling is required but fill_value is None.")
# This allows us to match xarray's type promotion rules
if fill_value is xrdtypes.NA:
new_dtype, fill_value = xrdtypes.maybe_promote(finalized[agg.name].dtype)
finalized[agg.name] = finalized[agg.name].astype(new_dtype)
finalized[agg.name] = np.where(count_mask, fill_value, finalized[agg.name])
# Final reindexing has to be here to be lazy
if not reindex and expected_groups is not None:
finalized[agg.name] = reindex_(
finalized[agg.name],
squeezed["groups"],
expected_groups,
fill_value=fill_value,
)
finalized["groups"] = expected_groups.to_numpy()
else:
finalized["groups"] = squeezed["groups"]
finalized[agg.name] = finalized[agg.name].astype(agg.dtype["final"], copy=False)
return finalized
def _aggregate(
x_chunk,
combine: Callable,
agg: Aggregation,
expected_groups: pd.Index | None,
axis: T_Axes,
keepdims,
fill_value: Any,
reindex: bool,
) -> FinalResultsDict:
"""Final aggregation step of tree reduction"""
results = combine(x_chunk, agg, axis, keepdims, is_aggregate=True)
return _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
def _expand_dims(results: IntermediateDict) -> IntermediateDict:
results["intermediates"] = tuple(np.expand_dims(array, DUMMY_AXIS) for array in results["intermediates"])
return results
def _find_unique_groups(x_chunk) -> np.ndarray:
from dask.base import flatten
from dask.utils import deepmap
unique_groups = _unique(np.asarray(tuple(flatten(deepmap(listify_groups, x_chunk)))))
unique_groups = unique_groups[notnull(unique_groups)]
if len(unique_groups) == 0:
unique_groups = np.array([np.nan])
return unique_groups
def _simple_combine(
x_chunk,
agg: Aggregation,
axis: T_Axes,
keepdims: bool,
reindex: bool,
is_aggregate: bool = False,
) -> IntermediateDict:
"""
'Simple' combination of blockwise results.
1. After the blockwise groupby-reduce, all blocks contain a value for all possible groups,
and are of the same shape; i.e. reindex must have been True
2. _expand_dims was used to insert an extra axis DUMMY_AXIS
3. Here we concatenate along DUMMY_AXIS, and then call the combine function along
DUMMY_AXIS
4. At the final aggregate step, we squeeze out DUMMY_AXIS
"""
from dask.array.core import deepfirst
from dask.utils import deepmap
if not reindex:
# We didn't reindex at the blockwise step
# So now reindex before combining by reducing along DUMMY_AXIS
unique_groups = _find_unique_groups(x_chunk)
x_chunk = deepmap(
partial(reindex_intermediates, agg=agg, unique_groups=unique_groups),
x_chunk,
)
else:
unique_groups = deepfirst(x_chunk)["groups"]
results: IntermediateDict = {"groups": unique_groups}
results["intermediates"] = []
axis_ = axis[:-1] + (DUMMY_AXIS,)
for idx, combine in enumerate(agg.simple_combine):
array = _conc2(x_chunk, key1="intermediates", key2=idx, axis=axis_)
assert array.ndim >= 2
with warnings.catch_warnings():
warnings.filterwarnings("ignore", r"All-NaN (slice|axis) encountered")
assert callable(combine)
result = combine(array, axis=axis_, keepdims=True)
if is_aggregate:
# squeeze out DUMMY_AXIS if this is the last step i.e. called from _aggregate
result = result.squeeze(axis=DUMMY_AXIS)
results["intermediates"].append(result)
return results
def _conc2(x_chunk, key1, key2=slice(None), axis: T_Axes | None = None) -> np.ndarray:
"""copied from dask.array.reductions.mean_combine"""
from dask.array.core import _concatenate2
from dask.utils import deepmap
mapped = deepmap(lambda x: x[key1][key2], x_chunk)
return _concatenate2(mapped, axes=axis)
# This doesn't seem to improve things at all; and some tests fail...
# from dask.array.core import concatenate3
# for _ in range(mapped[0].ndim-1):
# mapped = [mapped]
# return concatenate3(mapped)
def reindex_intermediates(x: IntermediateDict, agg: Aggregation, unique_groups) -> IntermediateDict:
new_shape = x["groups"].shape[:-1] + (len(unique_groups),)
newx: IntermediateDict = {"groups": np.broadcast_to(unique_groups, new_shape)}
newx["intermediates"] = tuple(
reindex_(
v,
from_=np.atleast_1d(x["groups"].squeeze()),
to=pd.Index(unique_groups),
fill_value=f,
)
for v, f in zip(x["intermediates"], agg.fill_value["intermediate"])
)
return newx
def listify_groups(x: IntermediateDict):
return list(np.atleast_1d(x["groups"].squeeze()))
def _grouped_combine(
x_chunk,
agg: Aggregation,
axis: T_Axes,
keepdims: bool,
engine: T_Engine,
is_aggregate: bool = False,
sort: bool = True,
) -> IntermediateDict:
"""Combine intermediates step of tree reduction."""
from dask.utils import deepmap
combine = agg.combine
if isinstance(x_chunk, dict):
# Only one block at final step; skip one extra groupby
return x_chunk
if len(axis) != 1:
# when there's only a single axis of reduction, we can just concatenate later,
# reindexing is unnecessary
# I bet we can minimize the amount of reindexing for mD reductions too, but it's complicated
unique_groups = _find_unique_groups(x_chunk)
x_chunk = deepmap(
partial(reindex_intermediates, agg=agg, unique_groups=unique_groups),
x_chunk,
)
# these are negative axis indices useful for concatenating the intermediates
neg_axis = tuple(range(-len(axis), 0))
groups = _conc2(x_chunk, "groups", axis=neg_axis)
if agg.reduction_type == "argreduce":
# If "nanlen" was added for masking later, we need to account for that
if agg.chunk[-1] == "nanlen":
slicer = slice(None, -1)
else:
slicer = slice(None, None)
# We need to send the intermediate array values & indexes at the same time
# intermediates are (value e.g. max, index e.g. argmax, counts)
array_idx = tuple(_conc2(x_chunk, key1="intermediates", key2=idx, axis=axis) for idx in (0, 1))
# for a single element along axis, we don't want to run the argreduction twice
# This happens when we are reducing along an axis with a single chunk.
avoid_reduction = array_idx[0].shape[axis[0]] == 1
if avoid_reduction:
results: IntermediateDict = {
"groups": groups,
"intermediates": list(array_idx),
}
else:
results = chunk_argreduce(
array_idx,
groups,
# count gets treated specially next
func=combine[slicer], # type: ignore[arg-type]
axis=axis,
expected_groups=None,
fill_value=agg.fill_value["intermediate"][slicer],
dtype=agg.dtype["intermediate"][slicer],
engine=engine,
sort=sort,
)
if agg.chunk[-1] == "nanlen":
counts = _conc2(x_chunk, key1="intermediates", key2=2, axis=axis)
if avoid_reduction:
results["intermediates"].append(counts)
else:
# sum the counts
results["intermediates"].append(
chunk_reduce(
counts,
groups,
func="sum",
axis=axis,
expected_groups=None,
fill_value=(0,),
dtype=(np.intp,),
engine=engine,
sort=sort,
user_dtype=agg.dtype["user"],
)["intermediates"][0]
)
elif agg.reduction_type == "reduce":
# Here we reduce the intermediates individually
results = {"groups": None, "intermediates": []}
for idx, (combine_, fv, dtype) in enumerate(
zip(combine, agg.fill_value["intermediate"], agg.dtype["intermediate"])
):
assert combine_ is not None
array = _conc2(x_chunk, key1="intermediates", key2=idx, axis=axis)
if array.shape[-1] == 0:
# all empty when combined
results["intermediates"].append(np.empty(shape=(1,) * (len(axis) - 1) + (0,), dtype=dtype))
results["groups"] = np.empty(shape=(1,) * (len(neg_axis) - 1) + (0,), dtype=groups.dtype)
else:
_results = chunk_reduce(
array,
groups,
func=combine_,
axis=axis,
expected_groups=None,
fill_value=(fv,),
dtype=(dtype,),
engine=engine,
sort=sort,
user_dtype=agg.dtype["user"],
)
results["intermediates"].append(*_results["intermediates"])
results["groups"] = _results["groups"]
return results
def _reduce_blockwise(
array,
by,
agg: Aggregation,
*,
axis: T_Axes,
expected_groups,
fill_value: Any,
engine: T_Engine,
sort: bool,
reindex: bool,
) -> FinalResultsDict:
"""
Blockwise groupby reduction that produces the final result. This code path is
also used for non-dask array aggregations.
"""
# for pure numpy grouping, we just use npg directly and avoid "finalizing"
# (agg.finalize = None). We still need to do the reindexing step in finalize
# so that everything matches the dask version.
agg.finalize = None
assert agg.finalize_kwargs is not None
finalize_kwargs_: tuple[dict[Any, Any], ...] = (agg.finalize_kwargs,) + ({},) + ({},)
results = chunk_reduce(
array,
by,
func=agg.numpy,
axis=axis,
expected_groups=expected_groups,
# This fill_value should only apply to groups that only contain NaN observations
# BUT there is funkiness when axis is a subset of all possible values
# (see below)
fill_value=agg.fill_value["numpy"],
dtype=agg.dtype["numpy"],
kwargs=finalize_kwargs_,
engine=engine,
sort=sort,
reindex=reindex,
user_dtype=agg.dtype["user"],
)
if _is_arg_reduction(agg):
results["intermediates"][0] = np.unravel_index(results["intermediates"][0], array.shape)[-1]
result = _finalize_results(results, agg, axis, expected_groups, fill_value=fill_value, reindex=reindex)
return result
def _normalize_indexes(array: DaskArray, flatblocks, blkshape) -> tuple:
"""
.blocks accessor can only accept one iterable at a time,
but can handle multiple slices.
To minimize tasks and layers, we normalize to produce slices
along as many axes as possible, and then repeatedly apply
any remaining iterables in a loop.
TODO: move this upstream
"""
unraveled = np.unravel_index(flatblocks, blkshape)
normalized: list[int | slice | list[int]] = []
for ax, idx in enumerate(unraveled):
i = _unique(idx).squeeze()
if i.ndim == 0:
normalized.append(i.item())
else:
if np.array_equal(i, np.arange(blkshape[ax])):
normalized.append(slice(None))
elif np.array_equal(i, np.arange(i[0], i[-1] + 1)):
normalized.append(slice(i[0], i[-1] + 1))
else:
normalized.append(list(i))
full_normalized = (slice(None),) * (array.ndim - len(normalized)) + tuple(normalized)
# has no iterables
noiter = list(i if not hasattr(i, "__len__") else slice(None) for i in full_normalized)
# has all iterables
alliter = {ax: i for ax, i in enumerate(full_normalized) if hasattr(i, "__len__")}
mesh = dict(zip(alliter.keys(), np.ix_(*alliter.values())))
full_tuple = tuple(i if ax not in mesh else mesh[ax] for ax, i in enumerate(noiter))
return full_tuple
def subset_to_blocks(
array: DaskArray,
flatblocks: Sequence[int],
blkshape: tuple[int, ...] | None = None,
reindexer=identity,
chunks_as_array: tuple[np.ndarray, ...] | None = None,
) -> DaskArray:
"""
Advanced indexing of .blocks such that we always get a regular array back.
Parameters
----------
array : dask.array
flatblocks : flat indices of blocks to extract
blkshape : shape of blocks with which to unravel flatblocks
Returns
-------
dask.array
"""
import dask.array
from dask.array.slicing import normalize_index
from dask.base import tokenize
from dask.highlevelgraph import HighLevelGraph
if blkshape is None:
blkshape = array.blocks.shape
if chunks_as_array is None:
chunks_as_array = tuple(np.array(c) for c in array.chunks)
index = _normalize_indexes(array, flatblocks, blkshape)
if all(not isinstance(i, np.ndarray) and i == slice(None) for i in index):
return dask.array.map_blocks(reindexer, array, meta=array._meta)
# These rest is copied from dask.array.core.py with slight modifications
index = normalize_index(index, array.numblocks)
index = tuple(slice(k, k + 1) if isinstance(k, Integral) else k for k in index)
name = "groupby-cohort-" + tokenize(array, index)
new_keys = array._key_array[index]
squeezed = tuple(np.squeeze(i) if isinstance(i, np.ndarray) else i for i in index)
chunks = tuple(tuple(c[i].tolist()) for c, i in zip(chunks_as_array, squeezed))
keys = itertools.product(*(range(len(c)) for c in chunks))
layer: Graph = {(name,) + key: (reindexer, tuple(new_keys[key].tolist())) for key in keys}
graph = HighLevelGraph.from_collections(name, layer, dependencies=[array])
return dask.array.Array(graph, name, chunks, meta=array)
def _extract_unknown_groups(reduced, dtype) -> tuple[DaskArray]:
import dask.array
from dask.highlevelgraph import HighLevelGraph
groups_token = f"group-{reduced.name}"
first_block = reduced.ndim * (0,)
layer: Graph = {(groups_token, 0): (operator.getitem, (reduced.name, *first_block), "groups")}
groups: tuple[DaskArray] = (
dask.array.Array(
HighLevelGraph.from_collections(groups_token, layer, dependencies=[reduced]),
groups_token,
chunks=((np.nan,),),
meta=np.array([], dtype=dtype),
),
)
return groups
def _unify_chunks(array, by):
from dask.array import from_array, unify_chunks
inds = tuple(range(array.ndim))
# Unifying chunks is necessary for argreductions.
# We need to rechunk before zipping up with the index
# let's always do it anyway
if not is_duck_dask_array(by):
# chunk numpy arrays like the input array
# This removes an extra rechunk-merge layer that would be
# added otherwise
chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0))
by = from_array(by, chunks=chunks)
_, (array, by) = unify_chunks(array, inds, by, inds[-by.ndim :])
return array, by
def dask_groupby_agg(
array: DaskArray,
by: T_By,
agg: Aggregation,
expected_groups: pd.RangeIndex | None,
axis: T_Axes = (),
fill_value: Any = None,
method: T_Method = "map-reduce",
reindex: bool = False,
engine: T_Engine = "numpy",
sort: bool = True,
chunks_cohorts=None,
) -> tuple[DaskArray, tuple[np.ndarray | DaskArray]]:
import dask.array
from dask.array.core import slices_from_chunks
# I think _tree_reduce expects this
assert isinstance(axis, Sequence)
assert all(ax >= 0 for ax in axis)
inds = tuple(range(array.ndim))
name = f"groupby_{agg.name}"
if expected_groups is None and reindex:
raise ValueError
if method == "cohorts":
assert reindex is False
by_input = by
array, by = _unify_chunks(array, by)
# tokenize here since by has already been hashed if its numpy
token = dask.base.tokenize(array, by, agg, expected_groups, axis, method)
# preprocess the array:
# - for argreductions, this zips the index together with the array block
# - not necessary for blockwise with argreductions
# - if this is needed later, we can fix this then
if agg.preprocess and method != "blockwise":
array = agg.preprocess(array, axis=axis)
# 1. We first apply the groupby-reduction blockwise to generate "intermediates"
# 2. These intermediate results are combined to generate the final result using a
# "map-reduce" or "tree reduction" approach.
# There are two ways:
# a. "_simple_combine": Where it makes sense, we tree-reduce the reduction,
# NOT the groupby-reduction for a speed boost. This is what xhistogram does (effectively),
# It requires that all blocks contain all groups after the initial blockwise step (1) i.e.
# reindex=True, and we must know expected_groups
# b. "_grouped_combine": A more general solution where we tree-reduce the groupby reduction.
# This allows us to discover groups at compute time, support argreductions, lower intermediate
# memory usage (but method="cohorts" would also work to reduce memory in some cases)
labels_are_unknown = is_duck_dask_array(by_input) and expected_groups is None
do_grouped_combine = (
_is_arg_reduction(agg)
or labels_are_unknown
or (_is_first_last_reduction(agg) and array.dtype.kind != "f")
)
do_simple_combine = not do_grouped_combine
if method == "blockwise":
# use the "non dask" code path, but applied blockwise
blockwise_method = partial(_reduce_blockwise, agg=agg, fill_value=fill_value, reindex=reindex)
else:
# choose `chunk_reduce` or `chunk_argreduce`
blockwise_method = partial(
_get_chunk_reduction(agg.reduction_type),
func=agg.chunk,
fill_value=agg.fill_value["intermediate"],
dtype=agg.dtype["intermediate"],
reindex=reindex,
user_dtype=agg.dtype["user"],
)
if do_simple_combine:
# Add a dummy dimension that then gets reduced over
blockwise_method = tlz.compose(_expand_dims, blockwise_method)
# apply reduction on chunk
intermediate = dask.array.blockwise(
partial(
blockwise_method,
axis=axis,
expected_groups=expected_groups if reindex else None,
engine=engine,
sort=sort,
),
# output indices are the same as input indices
# Unlike xhistogram, we don't always know what the size of the group
# dimension will be unless reindex=True
inds,
array,
inds,
by,
inds[-by.ndim :],
concatenate=False,
dtype=array.dtype, # this is purely for show
meta=array._meta,
align_arrays=False,
name=f"{name}-chunk-{token}",
)
group_chunks: tuple[tuple[int | float, ...]]
if method in ["map-reduce", "cohorts"]:
combine: Callable[..., IntermediateDict] = (
partial(_simple_combine, reindex=reindex)
if do_simple_combine
else partial(_grouped_combine, engine=engine, sort=sort)
)
tree_reduce = partial(
dask.array.reductions._tree_reduce,
name=f"{name}-simple-reduce",
dtype=array.dtype,
axis=axis,
keepdims=True,
concatenate=False,
)
aggregate = partial(_aggregate, combine=combine, agg=agg, fill_value=fill_value)
# Each chunk of `reduced`` is really a dict mapping
# 1. reduction name to array
# 2. "groups" to an array of group labels
# Note: it does not make sense to interpret axis relative to
# shape of intermediate results after the blockwise call
if method == "map-reduce":
reduced = tree_reduce(
intermediate,
combine=partial(combine, agg=agg),
aggregate=partial(aggregate, expected_groups=expected_groups, reindex=reindex),
)
if labels_are_unknown:
groups = _extract_unknown_groups(reduced, dtype=by.dtype)
group_chunks = ((np.nan,),)
else:
assert expected_groups is not None
groups = (expected_groups.to_numpy(),)
group_chunks = ((len(expected_groups),),)
elif method == "cohorts":
assert chunks_cohorts
block_shape = array.blocks.shape[-len(axis) :]
reduced_ = []
groups_ = []
chunks_as_array = tuple(np.array(c) for c in array.chunks)
for blks, cohort in chunks_cohorts.items():
cohort_index = pd.Index(cohort)
reindexer = (
partial(reindex_intermediates, agg=agg, unique_groups=cohort_index)
if do_simple_combine
else identity
)
reindexed = subset_to_blocks(intermediate, blks, block_shape, reindexer, chunks_as_array)
# now that we have reindexed, we can set reindex=True explicitlly
reduced_.append(
tree_reduce(
reindexed,
combine=partial(combine, agg=agg, reindex=do_simple_combine),
aggregate=partial(
aggregate,
expected_groups=cohort_index,
reindex=do_simple_combine,
),
)
)
# This is done because pandas promotes to 64-bit types when an Index is created
# So we use the index to generate the return value for consistency with "map-reduce"
# This is important on windows
groups_.append(cohort_index.values)
reduced = dask.array.concatenate(reduced_, axis=-1)
groups = (np.concatenate(groups_),)
group_chunks = (tuple(len(cohort) for cohort in groups_),)
elif method == "blockwise":
reduced = intermediate
if reindex:
if TYPE_CHECKING:
assert expected_groups is not None
# TODO: we could have `expected_groups` be a dask array with appropriate chunks
# for now, we have a numpy array that is interpreted as listing all group labels
# that are present in every chunk
groups = (expected_groups,)
group_chunks = ((len(expected_groups),),)
else:
# TODO: use chunks_cohorts here; hard because chunks_cohorts does not include all-NaN blocks
# but the array after applying the blockwise op; does. We'd have to insert a subsetting op.
# Here one input chunk → one output chunks
# find number of groups in each chunk, this is needed for output chunks
# along the reduced axis
# TODO: this logic is very specialized for the resampling case
slices = slices_from_chunks(tuple(array.chunks[ax] for ax in axis))
groups_in_block = tuple(_unique(by_input[slc]) for slc in slices)
groups = (np.concatenate(groups_in_block),)
ngroups_per_block = tuple(len(grp) for grp in groups_in_block)
group_chunks = (ngroups_per_block,)
else:
raise ValueError(f"Unknown method={method}.")
# Adjust output for any new dimensions added, example for multiple quantiles
new_dims_shape = tuple(dim.size for dim in agg.new_dims if not dim.is_scalar)
new_inds = tuple(range(-len(new_dims_shape), 0))
out_inds = new_inds + inds[: -len(axis)] + (inds[-1],)
output_chunks = new_dims_shape + reduced.chunks[: -len(axis)] + group_chunks
new_axes = dict(zip(new_inds, new_dims_shape))
if method == "blockwise" and len(axis) > 1:
# The final results are available but the blocks along axes
# need to be reshaped to axis=-1
# I don't know that this is possible with blockwise
# All other code paths benefit from an unmaterialized Blockwise layer
reduced = _collapse_blocks_along_axes(reduced, axis, group_chunks)
# Can't use map_blocks because it forces concatenate=True along drop_axes,
result = dask.array.blockwise(
_extract_result,
out_inds,
reduced,
inds,
adjust_chunks=dict(zip(out_inds, output_chunks)),
dtype=agg.dtype["final"],
key=agg.name,
name=f"{name}-{token}",
concatenate=False,
new_axes=new_axes,
)
return (result, groups)
def cubed_groupby_agg(
array: CubedArray,
by: T_By,
agg: Aggregation,
expected_groups: pd.Index | None,
axis: T_Axes = (),
fill_value: Any = None,
method: T_Method = "map-reduce",
reindex: bool = False,
engine: T_Engine = "numpy",
sort: bool = True,
chunks_cohorts=None,
) -> tuple[CubedArray, tuple[np.ndarray | CubedArray]]:
import cubed
import cubed.core.groupby
# I think _tree_reduce expects this
assert isinstance(axis, Sequence)
assert all(ax >= 0 for ax in axis)
if method == "blockwise":
assert by.ndim == 1
assert expected_groups is not None
def _reduction_func(a, by, axis, start_group, num_groups):
# adjust group labels to start from 0 for each chunk
by_for_chunk = by - start_group
expected_groups_for_chunk = pd.RangeIndex(num_groups)
axis = (axis,) # convert integral axis to tuple
blockwise_method = partial(
_reduce_blockwise,
agg=agg,
axis=axis,
expected_groups=expected_groups_for_chunk,
fill_value=fill_value,
engine=engine,
sort=sort,
reindex=reindex,
)
out = blockwise_method(a, by_for_chunk)
return out[agg.name]
num_groups = len(expected_groups)
result = cubed.core.groupby.groupby_blockwise(
array, by, axis=axis, func=_reduction_func, num_groups=num_groups
)
groups = (expected_groups.to_numpy(),)
return (result, groups)
else:
inds = tuple(range(array.ndim))
by_input = by
# Unifying chunks is necessary for argreductions.
# We need to rechunk before zipping up with the index
# let's always do it anyway
if not is_chunked_array(by):
# chunk numpy arrays like the input array
chunks = tuple(array.chunks[ax] if by.shape[ax] != 1 else (1,) for ax in range(-by.ndim, 0))
by = cubed.from_array(by, chunks=chunks, spec=array.spec)
_, (array, by) = cubed.core.unify_chunks(array, inds, by, inds[-by.ndim :])
# Cubed's groupby_reduction handles the generation of "intermediates", and the
# "map-reduce" combination step, so we don't have to do that here.
# Only the equivalent of "_simple_combine" is supported, there is no
# support for "_grouped_combine".
labels_are_unknown = is_chunked_array(by_input) and expected_groups is None
do_simple_combine = not _is_arg_reduction(agg) and not labels_are_unknown
assert do_simple_combine
assert method == "map-reduce"
assert expected_groups is not None
assert reindex is True
assert len(axis) == 1 # one axis/grouping
def _groupby_func(a, by, axis, intermediate_dtype, num_groups):
blockwise_method = partial(
_get_chunk_reduction(agg.reduction_type),
func=agg.chunk,
fill_value=agg.fill_value["intermediate"],
dtype=agg.dtype["intermediate"],
reindex=reindex,
user_dtype=agg.dtype["user"],
axis=axis,
expected_groups=expected_groups,
engine=engine,
sort=sort,
)
out = blockwise_method(a, by)
# Convert dict to one that cubed understands, dropping groups since they are
# known, and the same for every block.
return {f"f{idx}": intermediate for idx, intermediate in enumerate(out["intermediates"])}
def _groupby_combine(a, axis, dummy_axis, dtype, keepdims):
# this is similar to _simple_combine, except the dummy axis and concatenation is handled by cubed
# only combine over the dummy axis, to preserve grouping along 'axis'
dtype = dict(dtype)
out = {}
for idx, combine in enumerate(agg.simple_combine):
field = f"f{idx}"
out[field] = combine(a[field], axis=dummy_axis, keepdims=keepdims)
return out
def _groupby_aggregate(a):
# Convert cubed dict to one that _finalize_results works with
results = {"groups": expected_groups, "intermediates": a.values()}
out = _finalize_results(results, agg, axis, expected_groups, fill_value, reindex)
return out[agg.name]
# convert list of dtypes to a structured dtype for cubed
intermediate_dtype = [(f"f{i}", dtype) for i, dtype in enumerate(agg.dtype["intermediate"])]
dtype = agg.dtype["final"]
num_groups = len(expected_groups)
result = cubed.core.groupby.groupby_reduction(
array,
by,
func=_groupby_func,
combine_func=_groupby_combine,
aggregate_func=_groupby_aggregate,
axis=axis,
intermediate_dtype=intermediate_dtype,
dtype=dtype,
num_groups=num_groups,
)
groups = (expected_groups.to_numpy(),)
return (result, groups)
def _collapse_blocks_along_axes(reduced: DaskArray, axis: T_Axes, group_chunks) -> DaskArray:
import dask.array
from dask.highlevelgraph import HighLevelGraph
nblocks = tuple(reduced.numblocks[ax] for ax in axis)
output_chunks = reduced.chunks[: -len(axis)] + ((1,) * (len(axis) - 1),) + group_chunks
# extract results from the dict
ochunks = tuple(range(len(chunks_v)) for chunks_v in output_chunks)
layer2: dict[tuple, tuple] = {}
name = f"reshape-{reduced.name}"
for ochunk in itertools.product(*ochunks):
inchunk = ochunk[: -len(axis)] + np.unravel_index(ochunk[-1], nblocks)
layer2[(name, *ochunk)] = (reduced.name, *inchunk)
layer2: Graph
return dask.array.Array(
HighLevelGraph.from_collections(name, layer2, dependencies=[reduced]),
name,
chunks=output_chunks,
dtype=reduced.dtype,
)
def _extract_result(result_dict: FinalResultsDict, key) -> np.ndarray:
from dask.array.core import deepfirst
# deepfirst should be not be needed here but sometimes we receive a list of dict?
return deepfirst(result_dict)[key]
def _validate_reindex(
reindex: bool | None,
func,
method: T_MethodOpt,
expected_groups,
any_by_dask: bool,
is_dask_array: bool,
array_dtype: Any,
) -> bool | None:
# logger.debug("Entering _validate_reindex: reindex is {}".format(reindex)) # noqa
def first_or_last():
return func in ["first", "last"] or (_is_first_last_reduction(func) and array_dtype.kind != "f")
all_numpy = not is_dask_array and not any_by_dask
if reindex is True and not all_numpy:
if _is_arg_reduction(func):
raise NotImplementedError
if method == "cohorts" or (method == "blockwise" and not any_by_dask):
raise ValueError("reindex=True is not a valid choice for method='blockwise' or method='cohorts'.")
if first_or_last():
raise ValueError("reindex must be None or False when func is 'first' or 'last.")
if reindex is None:
if method is None:
# logger.debug("Leaving _validate_reindex: method = None, returning None")
return None
if all_numpy:
return True
if first_or_last():
# have to do the grouped_combine since there's no good fill_value
# Also needed for nanfirst, nanlast with no-NaN dtypes
return False
if method == "blockwise":
# for grouping by dask arrays, we set reindex=True
reindex = any_by_dask
elif _is_arg_reduction(func):
reindex = False
elif method == "cohorts":
reindex = False
elif method == "map-reduce":
if expected_groups is None and any_by_dask:
reindex = False
else:
reindex = True
assert isinstance(reindex, bool)
# logger.debug("Leaving _validate_reindex: reindex is {}".format(reindex)) # noqa
return reindex
def _assert_by_is_aligned(shape: tuple[int, ...], by: T_Bys) -> None:
assert all(b.ndim == by[0].ndim for b in by[1:])
for idx, b in enumerate(by):
if not all(j in [i, 1] for i, j in zip(shape[-b.ndim :], b.shape)):
raise ValueError(
"`array` and `by` arrays must be 'aligned' "
"so that such that by_ is broadcastable to array.shape[-by.ndim:] "
"for every array `by_` in `by`. "
"Either array.shape[-by_.ndim :] == by_.shape or the only differences "
"should be size-1 dimensions in by_."
f"Received array of shape {shape} but "
f"array {idx} in `by` has shape {b.shape}."
)
@overload
def _convert_expected_groups_to_index(
expected_groups: tuple[None, ...], isbin: Sequence[bool], sort: bool
) -> tuple[None, ...]: ...
@overload
def _convert_expected_groups_to_index(
expected_groups: T_ExpectTuple, isbin: Sequence[bool], sort: bool
) -> T_ExpectIndexTuple: ...
def _convert_expected_groups_to_index(
expected_groups: T_ExpectOptTuple, isbin: Sequence[bool], sort: bool
) -> T_ExpectIndexOptTuple:
out: list[T_ExpectIndexOpt] = []
for ex, isbin_ in zip(expected_groups, isbin):
if isinstance(ex, pd.IntervalIndex) or (isinstance(ex, pd.Index) and not isbin_):
if sort:
out.append(ex.sort_values())
else:
out.append(ex)
elif ex is not None:
if isbin_:
out.append(pd.IntervalIndex.from_breaks(ex))
else:
if sort:
ex = np.sort(ex)
out.append(pd.Index(ex))
else:
assert ex is None
out.append(None)
return tuple(out)
def _lazy_factorize_wrapper(*by: T_By, **kwargs) -> np.ndarray:
group_idx, *_ = factorize_(by, **kwargs)
return group_idx
def _factorize_multiple(
by: T_Bys,
expected_groups: T_ExpectIndexOptTuple,
any_by_dask: bool,
sort: bool = True,
) -> tuple[tuple[np.ndarray], tuple[np.ndarray, ...], tuple[int, ...]]:
kwargs: FactorizeKwargs = dict(
axes=(), # always (), we offset later if necessary.
expected_groups=expected_groups,
fastpath=True,
# This is the only way it makes sense I think.
# reindex controls what's actually allocated in chunk_reduce
# At this point, we care about an accurate conversion to codes.
reindex=True,
sort=sort,
)
if any_by_dask:
import dask.array
# unifying chunks will make sure all arrays in `by` are dask arrays
# with compatible chunks, even if there was originally a numpy array
inds = tuple(range(by[0].ndim))
chunks, by_ = dask.array.unify_chunks(*itertools.chain(*zip(by, (inds,) * len(by))))
group_idx = dask.array.map_blocks(
_lazy_factorize_wrapper,
*by_,
chunks=tuple(chunks.values()),
meta=np.array((), dtype=np.int64),
**kwargs,
)
fg, gs = [], []
for by_, expect in zip(by, expected_groups):
if expect is None:
if is_duck_dask_array(by_):
raise ValueError("Please provide expected_groups when grouping by a dask array.")
found_group = pd.unique(by_.reshape(-1))
else:
found_group = expect.to_numpy()
fg.append(found_group)
gs.append(len(found_group))
found_groups = tuple(fg)
grp_shape = tuple(gs)
else:
kwargs["by"] = by
group_idx, found_groups, grp_shape, *_ = factorize_(**kwargs)
return (group_idx,), found_groups, grp_shape
@overload
def _validate_expected_groups(nby: int, expected_groups: None) -> tuple[None, ...]: ...
@overload
def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroups) -> T_ExpectTuple: ...
def _validate_expected_groups(nby: int, expected_groups: T_ExpectedGroupsOpt) -> T_ExpectOptTuple:
if expected_groups is None:
return (None,) * nby
if nby == 1 and not isinstance(expected_groups, tuple):
if isinstance(expected_groups, pd.Index | np.ndarray):
return (expected_groups,)
else:
array = np.asarray(expected_groups)
if np.issubdtype(array.dtype, np.integer):
# preserve default dtypes
# on pandas 1.5/2, on windows
# when a list is passed
array = array.astype(np.int64)
return (array,)
if nby > 1 and not isinstance(expected_groups, tuple): # TODO: test for list
raise ValueError(
"When grouping by multiple variables, expected_groups must be a tuple "
"of either arrays or objects convertible to an array (like lists). "
"For example `expected_groups=(np.array([1, 2, 3]), ['a', 'b', 'c'])`."
f"Received a {type(expected_groups).__name__} instead. "
"When grouping by a single variable, you can pass an array or something "
"convertible to an array for convenience: `expected_groups=['a', 'b', 'c']`."
)
if TYPE_CHECKING:
assert isinstance(expected_groups, tuple)
if len(expected_groups) != nby:
raise ValueError(
f"Must have same number of `expected_groups` (received {len(expected_groups)}) "
f" and variables to group by (received {nby})."
)
return expected_groups
def _choose_method(
method: T_MethodOpt, preferred_method: T_Method, agg: Aggregation, by, nax: int
) -> T_Method:
if method is None:
logger.debug("_choose_method: method is None")
if agg.chunk == (None,):
if preferred_method != "blockwise":
raise ValueError(
f"Aggregation {agg.name} is only supported for `method='blockwise'`, "
"but the chunking is not right."
)
logger.debug("_choose_method: choosing 'blockwise'")
return "blockwise"
if nax != by.ndim:
logger.debug("_choose_method: choosing 'map-reduce'")
return "map-reduce"
if _is_arg_reduction(agg) and preferred_method == "blockwise":
return "cohorts"
logger.debug(f"_choose_method: choosing preferred_method={preferred_method}") # noqa
return preferred_method
else:
return method
def _choose_engine(by, agg: Aggregation):
dtype = agg.dtype["user"]
not_arg_reduce = not _is_arg_reduction(agg)
if agg.name in ["quantile", "nanquantile", "median", "nanmedian"]:
logger.debug(f"_choose_engine: Choosing 'flox' since {agg.name}")
return "flox"
# numbagg only supports nan-skipping reductions
# without dtype specified
has_blockwise_nan_skipping = (agg.chunk[0] is None and "nan" in agg.name) or any(
(isinstance(func, str) and "nan" in func) for func in agg.chunk
)
if HAS_NUMBAGG:
if agg.name in ["all", "any"] or (not_arg_reduce and has_blockwise_nan_skipping and dtype is None):
logger.debug("_choose_engine: Choosing 'numbagg'")
return "numbagg"
if not_arg_reduce and (not is_duck_dask_array(by) and _issorted(by)):
logger.debug("_choose_engine: Choosing 'flox'")
return "flox"
else:
logger.debug("_choose_engine: Choosing 'numpy'")
return "numpy"
[docs]
def groupby_reduce(
array: np.ndarray | DaskArray,
*by: T_By,
func: T_Agg,
expected_groups: T_ExpectedGroupsOpt = None,
sort: bool = True,
isbin: T_IsBins = False,
axis: T_AxesOpt = None,
fill_value=None,
dtype: np.typing.DTypeLike = None,
min_count: int | None = None,
method: T_MethodOpt = None,
engine: T_EngineOpt = None,
reindex: bool | None = None,
finalize_kwargs: dict[Any, Any] | None = None,
) -> tuple[DaskArray, Unpack[tuple[np.ndarray | DaskArray, ...]]]:
"""
GroupBy reductions using tree reductions for dask.array
Parameters
----------
array : ndarray or DaskArray
Array to be reduced, possibly nD
*by : ndarray or DaskArray
Array of labels to group over. Must be aligned with ``array`` so that
``array.shape[-by.ndim :] == by.shape`` or any disagreements in that
equality check are for dimensions of size 1 in `by`.
func : {"all", "any", "count", "sum", "nansum", "mean", "nanmean", \
"max", "nanmax", "min", "nanmin", "argmax", "nanargmax", "argmin", "nanargmin", \
"quantile", "nanquantile", "median", "nanmedian", "mode", "nanmode", \
"first", "nanfirst", "last", "nanlast"} or Aggregation
Single function name or an Aggregation instance
expected_groups : (optional) Sequence
Expected unique labels.
isbin : bool, optional
Are ``expected_groups`` bin edges?
sort : bool, optional
Whether groups should be returned in sorted order. Only applies for dask
reductions when ``method`` is not ``"map-reduce"``. For ``"map-reduce"``, the groups
are always sorted.
axis : None or int or Sequence[int], optional
If None, reduce across all dimensions of by
Else, reduce across corresponding axes of array
Negative integers are normalized using array.ndim
fill_value : Any
Value to assign when a label in ``expected_groups`` is not present.
dtype : data-type , optional
DType for the output. Can be anything that is accepted by ``np.dtype``.
min_count : int, default: None
The required number of valid values to perform the operation. If
fewer than min_count non-NA values are present the result will be
NA. Only used if skipna is set to True or defaults to True for the
array's dtype.
method : {"map-reduce", "blockwise", "cohorts"}, optional
Strategy for reduction of dask arrays only:
* ``"map-reduce"``:
First apply the reduction blockwise on ``array``, then
combine a few newighbouring blocks, apply the reduction.
Continue until finalizing. Usually, ``func`` will need
to be an Aggregation instance for this method to work.
Common aggregations are implemented.
* ``"blockwise"``:
Only reduce using blockwise and avoid aggregating blocks
together. Useful for resampling-style reductions where group
members are always together. If `by` is 1D, `array` is automatically
rechunked so that chunk boundaries line up with group boundaries
i.e. each block contains all members of any group present
in that block. For nD `by`, you must make sure that all members of a group
are present in a single block.
* ``"cohorts"``:
Finds group labels that tend to occur together ("cohorts"),
indexes out cohorts and reduces that subset using "map-reduce",
repeat for all cohorts. This works well for many time groupings
where the group labels repeat at regular intervals like 'hour',
'month', dayofyear' etc. Optimize chunking ``array`` for this
method by first rechunking using ``rechunk_for_cohorts``
(for 1D ``by`` only).
engine : {"flox", "numpy", "numba", "numbagg"}, optional
Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk:
* ``"numpy"``:
Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``.
This is the default choice because it works for most array types.
* ``"flox"``:
Use an internal implementation where the data is sorted so that
all members of a group occur sequentially, and then numpy.ufunc.reduceat
is to used for the reduction. This will fall back to ``numpy_groupies.aggregate_numpy``
for a reduction that is not yet implemented.
* ``"numba"``:
Use the implementations in ``numpy_groupies.aggregate_numba``.
* ``"numbagg"``:
Use the reductions supported by ``numbagg.grouped``. This will fall back to ``numpy_groupies.aggregate_numpy``
for a reduction that is not yet implemented.
reindex : bool, optional
Whether to "reindex" the blockwise results to ``expected_groups`` (possibly automatically detected).
If True, the intermediate result of the blockwise groupby-reduction has a value for all expected groups,
and the final result is a simple reduction of those intermediates. In nearly all cases, this is a significant
boost in computation speed. For cases like time grouping, this may result in large intermediates relative to the
original block size. Avoid that by using ``method="cohorts"``. By default, it is turned off for argreductions.
finalize_kwargs : dict, optional
Kwargs passed to finalize the reduction such as ``ddof`` for var, std or ``q`` for quantile.
Returns
-------
result
Aggregated result
*groups
Group labels
See Also
--------
xarray.xarray_reduce
"""
if engine == "flox" and _is_arg_reduction(func):
raise NotImplementedError(
"argreductions not supported for engine='flox' yet."
"Try engine='numpy' or engine='numba' instead."
)
if engine == "numbagg" and dtype is not None:
raise NotImplementedError(
"numbagg does not support the `dtype` kwarg. Either cast your "
"input arguments to `dtype` or use a different `engine`: "
"'flox' or 'numpy' or 'numba'. "
"See https://github.com/numbagg/numbagg/issues/121."
)
if func in ["quantile", "nanquantile"]:
if finalize_kwargs is None or "q" not in finalize_kwargs:
raise ValueError("Please pass `q` for quantile calculations.")
else:
nq = len(_atleast_1d(finalize_kwargs["q"]))
if nq > 1 and engine == "numpy":
raise ValueError(
"Multiple quantiles not supported with engine='numpy'."
"Use engine='flox' instead (it is also much faster), "
"or set engine=None to use the default."
)
bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by)
nby = len(bys)
by_is_dask = tuple(is_duck_dask_array(b) for b in bys)
any_by_dask = any(by_is_dask)
provided_expected = expected_groups is not None
if engine == "numbagg" and _is_arg_reduction(func) and (any_by_dask or is_duck_dask_array(array)):
# There is only one test that fails, but I can't figure
# out why without deep debugging.
# just disable for now.
# test_groupby_reduce_axis_subset_against_numpy
# for array is 3D dask, by is 3D dask, axis=2
# We are falling back to numpy for the arg reduction,
# so presumably something is going wrong
raise NotImplementedError(
"argreductions not supported for engine='numbagg' yet."
"Try engine='numpy' or engine='numba' instead."
)
if method == "cohorts" and any_by_dask:
raise ValueError(f"method={method!r} can only be used when grouping by numpy arrays.")
if not is_duck_array(array):
array = np.asarray(array)
reindex = _validate_reindex(
reindex,
func,
method,
expected_groups,
any_by_dask,
is_duck_dask_array(array),
array.dtype,
)
is_bool_array = np.issubdtype(array.dtype, bool)
array = array.astype(np.int_) if is_bool_array else array
isbins = _atleast_1d(isbin, nby)
_assert_by_is_aligned(array.shape, bys)
expected_groups = _validate_expected_groups(nby, expected_groups)
for idx, (expect, is_dask) in enumerate(zip(expected_groups, by_is_dask)):
if is_dask and (reindex or nby > 1) and expect is None:
raise ValueError(
f"`expected_groups` for array {idx} in `by` cannot be None since it is a dask.array."
)
# We convert to pd.Index since that lets us know if we are binning or not
# (pd.IntervalIndex or not)
expected_groups = _convert_expected_groups_to_index(expected_groups, isbins, sort)
# Don't factorize early only when
# grouping by dask arrays, and not having expected_groups
factorize_early = not (
# can't do it if we are grouping by dask array but don't have expected_groups
any(is_dask and ex_ is None for is_dask, ex_ in zip(by_is_dask, expected_groups))
)
expected_: pd.RangeIndex | None
if factorize_early:
bys, final_groups, grp_shape = _factorize_multiple(
bys,
expected_groups,
any_by_dask=any_by_dask,
sort=sort,
)
expected_ = pd.RangeIndex(math.prod(grp_shape))
else:
assert expected_groups == (None,)
expected_ = None
assert len(bys) == 1
(by_,) = bys
if axis is None:
axis_ = tuple(array.ndim + np.arange(-by_.ndim, 0))
else:
axis_ = normalize_axis_tuple(axis, array.ndim)
nax = len(axis_)
has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_)
has_cubed = is_duck_cubed_array(array) or is_duck_cubed_array(by_)
if _is_first_last_reduction(func):
if has_dask and nax != 1:
raise ValueError(
"For dask arrays: first, last, nanfirst, nanlast reductions are "
"only supported along a single axis. Please reshape appropriately."
)
elif nax not in [1, by_.ndim]:
raise ValueError(
"first, last, nanfirst, nanlast reductions are only supported "
"along a single axis or when reducing across all dimensions of `by`."
)
if nax == 1 and by_.ndim > 1 and expected_ is None:
# When we reduce along all axes, we are guaranteed to see all
# groups in the final combine stage, so everything works.
# This is not necessarily true when reducing along a subset of axes
# (of by)
# TODO: Does this depend on chunking of by?
# For e.g., we could relax this if there is only one chunk along all
# by dim != axis?
raise NotImplementedError("Please provide ``expected_groups`` when not reducing along all axes.")
assert nax <= by_.ndim
if nax < by_.ndim:
by_ = _move_reduce_dims_to_end(by_, tuple(-array.ndim + ax + by_.ndim for ax in axis_))
array = _move_reduce_dims_to_end(array, axis_)
axis_ = tuple(array.ndim + np.arange(-nax, 0))
nax = len(axis_)
# When axis is a subset of possible values; then npg will
# apply the fill_value to groups that don't exist along a particular axis (for e.g.)
# since these count as a group that is absent. thoo!
# fill_value applies to all-NaN groups as well as labels in expected_groups that are not found.
# The only way to do this consistently is mask out using min_count
# Consider np.sum([np.nan]) = np.nan, np.nansum([np.nan]) = 0
if min_count is None:
if nax < by_.ndim or (fill_value is not None and provided_expected):
min_count_: int = 1
else:
min_count_ = 0
else:
min_count_ = min_count
# TODO: set in xarray?
if min_count_ > 0 and func in ["nansum", "nanprod"] and fill_value is None:
# nansum, nanprod have fill_value=0, 1
# overwrite than when min_count is set
fill_value = np.nan
kwargs = dict(axis=axis_, fill_value=fill_value)
agg = _initialize_aggregation(func, dtype, array.dtype, fill_value, min_count_, finalize_kwargs)
# Need to set this early using `agg`
# It cannot be done in the core loop of chunk_reduce
# since we "prepare" the data for flox.
kwargs["engine"] = _choose_engine(by_, agg) if engine is None else engine
groups: tuple[np.ndarray | DaskArray, ...]
if has_cubed:
if method is None:
method = "map-reduce"
if method not in ("map-reduce", "blockwise"):
raise NotImplementedError(
"Reduction for Cubed arrays is only implemented for methods 'map-reduce' and 'blockwise'."
)
partial_agg = partial(cubed_groupby_agg, **kwargs)
result, groups = partial_agg(
array=array,
by=by_,
expected_groups=expected_,
agg=agg,
reindex=bool(reindex),
method=method,
sort=sort,
)
return (result, groups)
elif not has_dask:
results = _reduce_blockwise(
array,
by_,
agg,
expected_groups=expected_,
reindex=bool(reindex),
sort=sort,
**kwargs,
)
groups = (results["groups"],)
result = results[agg.name]
else:
if TYPE_CHECKING:
# TODO: How else to narrow that array.chunks is there?
assert isinstance(array, DaskArray)
if (not any_by_dask and method is None) or method == "cohorts":
preferred_method, chunks_cohorts = find_group_cohorts(
by_,
[array.chunks[ax] for ax in range(-by_.ndim, 0)],
expected_groups=expected_,
# when provided with cohorts, we *always* 'merge'
merge=(method == "cohorts"),
)
else:
preferred_method = "map-reduce"
chunks_cohorts = {}
method = _choose_method(method, preferred_method, agg, by_, nax)
if agg.chunk[0] is None and method != "blockwise":
raise NotImplementedError(
f"Aggregation {agg.name!r} is only implemented for dask arrays when method='blockwise'."
f"Received method={method!r}"
)
if (
_is_arg_reduction(agg)
and method == "blockwise"
and not all(nchunks == 1 for nchunks in array.numblocks[-nax:])
):
raise NotImplementedError(
"arg-reductions are not supported with method='blockwise', use 'cohorts' instead."
)
if nax != by_.ndim and method in ["blockwise", "cohorts"]:
raise NotImplementedError(
"Must reduce along all dimensions of `by` when method != 'map-reduce'."
f"Received method={method!r}"
)
# TODO: clean this up
reindex = _validate_reindex(
reindex,
func,
method,
expected_,
any_by_dask,
is_duck_dask_array(array),
array.dtype,
)
if TYPE_CHECKING:
assert method is not None
# TODO: just do this in dask_groupby_agg
# we always need some fill_value (see above) so choose the default if needed
if kwargs["fill_value"] is None:
kwargs["fill_value"] = agg.fill_value[agg.name]
partial_agg = partial(dask_groupby_agg, **kwargs)
# if preferred method is already blockwise, no need to rechunk
if preferred_method != "blockwise" and method == "blockwise" and by_.ndim == 1:
array = rechunk_for_blockwise(array, axis=-1, labels=by_)
result, groups = partial_agg(
array=array,
by=by_,
expected_groups=expected_,
agg=agg,
reindex=bool(reindex),
method=method,
chunks_cohorts=chunks_cohorts,
sort=sort,
)
if sort and method != "map-reduce":
assert len(groups) == 1
sorted_idx = np.argsort(groups[0])
# This optimization helps specifically with resampling
if not _issorted(sorted_idx):
result = result[..., sorted_idx]
groups = (groups[0][sorted_idx],)
if factorize_early:
assert len(groups) == 1
(groups_,) = groups
# nan group labels are factorized to -1, and preserved
# now we get rid of them by reindexing
# First, for "blockwise", we can have -1 repeated in different blocks
# This breaks the reindexing so remove those first.
if method == "blockwise" and (mask := groups_ == -1).sum(axis=-1) > 1:
result = result[..., ~mask]
groups_ = groups_[..., ~mask]
# This reindex also handles bins with no data
result = reindex_(result, from_=groups_, to=expected_, fill_value=fill_value).reshape(
result.shape[:-1] + grp_shape
)
groups = final_groups
if is_bool_array and (_is_minmax_reduction(func) or _is_first_last_reduction(func)):
result = result.astype(bool)
return (result, *groups)
[docs]
def groupby_scan(
array: np.ndarray | DaskArray,
*by: T_By,
func: T_Scan,
expected_groups: T_ExpectedGroupsOpt = None,
axis: int | tuple[int] = -1,
dtype: np.typing.DTypeLike = None,
method: T_MethodOpt = None,
engine: T_EngineOpt = None,
) -> np.ndarray | DaskArray:
"""
GroupBy reductions using parallel scans for dask.array
Parameters
----------
array : ndarray or DaskArray
Array to be reduced, possibly nD
*by : ndarray or DaskArray
Array of labels to group over. Must be aligned with ``array`` so that
``array.shape[-by.ndim :] == by.shape`` or any disagreements in that
equality check are for dimensions of size 1 in `by`.
func : {"nancumsum", "ffill", "bfill"} or Scan
Single function name or a Scan instance
expected_groups : (optional) Sequence
Expected unique labels.
axis : None or int or Sequence[int], optional
If None, reduce across all dimensions of by
Else, reduce across corresponding axes of array
Negative integers are normalized using array.ndim.
fill_value : Any
Value to assign when a label in ``expected_groups`` is not present.
dtype : data-type , optional
DType for the output. Can be anything that is accepted by ``np.dtype``.
method : {"blockwise", "cohorts"}, optional
Strategy for reduction of dask arrays only:
* ``"blockwise"``:
Only scan using blockwise and avoid aggregating blocks
together. Useful for resampling-style groupby problems where group
members are always together. If `by` is 1D, `array` is automatically
rechunked so that chunk boundaries line up with group boundaries
i.e. each block contains all members of any group present
in that block. For nD `by`, you must make sure that all members of a group
are present in a single block.
* ``"cohorts"``:
Finds group labels that tend to occur together ("cohorts"),
indexes out cohorts and reduces that subset using "map-reduce",
repeat for all cohorts. This works well for many time groupings
where the group labels repeat at regular intervals like 'hour',
'month', dayofyear' etc. Optimize chunking ``array`` for this
method by first rechunking using ``rechunk_for_cohorts``
(for 1D ``by`` only).
engine : {"flox", "numpy", "numba", "numbagg"}, optional
Algorithm to compute the groupby reduction on non-dask arrays and on each dask chunk:
* ``"numpy"``:
Use the vectorized implementations in ``numpy_groupies.aggregate_numpy``.
This is the default choice because it works for most array types.
* ``"flox"``:
Use an internal implementation where the data is sorted so that
all members of a group occur sequentially, and then numpy.ufunc.reduceat
is to used for the reduction. This will fall back to ``numpy_groupies.aggregate_numpy``
for a reduction that is not yet implemented.
* ``"numba"``:
Use the implementations in ``numpy_groupies.aggregate_numba``.
* ``"numbagg"``:
Use the reductions supported by ``numbagg.grouped``. This will fall back to ``numpy_groupies.aggregate_numpy``
for a reduction that is not yet implemented.
Returns
-------
result
Aggregated result
See Also
--------
xarray.xarray_reduce
"""
axis = _atleast_1d(axis)
if len(axis) > 1:
raise NotImplementedError("Scans are only supported along a single dimension.")
bys: T_Bys = tuple(np.asarray(b) if not is_duck_array(b) else b for b in by)
nby = len(by)
by_is_dask = tuple(is_duck_dask_array(b) for b in bys)
any_by_dask = any(by_is_dask)
axis_ = normalize_axis_tuple(axis, array.ndim)
if engine is not None:
raise NotImplementedError("Setting `engine` is not supported for scans yet.")
if method is not None:
raise NotImplementedError("Setting `method` is not supported for scans yet.")
if engine is None:
engine = "flox"
assert engine == "flox"
if not is_duck_array(array):
array = np.asarray(array)
if isinstance(func, str):
agg = AGGREGATIONS[func]
assert isinstance(agg, Scan)
agg = copy.deepcopy(agg)
if (agg == AGGREGATIONS["ffill"] or agg == AGGREGATIONS["bfill"]) and array.dtype.kind != "f":
# nothing to do, no NaNs!
return array
is_bool_array = np.issubdtype(array.dtype, bool)
array = array.astype(np.int_) if is_bool_array else array
if expected_groups is not None:
raise NotImplementedError("Setting `expected_groups` and binning is not supported yet.")
expected_groups = _validate_expected_groups(nby, expected_groups)
expected_groups = _convert_expected_groups_to_index(expected_groups, isbin=(False,) * nby, sort=False)
# Don't factorize early only when
# grouping by dask arrays, and not having expected_groups
factorize_early = not (
# can't do it if we are grouping by dask array but don't have expected_groups
any(is_dask and ex_ is None for is_dask, ex_ in zip(by_is_dask, expected_groups))
)
if factorize_early:
bys, final_groups, grp_shape = _factorize_multiple(
bys,
expected_groups,
any_by_dask=any_by_dask,
sort=False,
)
else:
raise NotImplementedError
assert len(bys) == 1
by_: np.ndarray
(by_,) = bys
has_dask = is_duck_dask_array(array) or is_duck_dask_array(by_)
# TODO: move to aggregate_npg.py
if agg.name in ["cumsum", "nancumsum"] and array.dtype.kind in ["i", "u"]:
# https://numpy.org/doc/stable/reference/generated/numpy.cumsum.html
# it defaults to the dtype of a, unless a
# has an integer dtype with a precision less than that of the default platform integer.
if array.dtype.kind == "i":
agg.dtype = np.result_type(array.dtype, np.int_)
elif array.dtype.kind == "u":
agg.dtype = np.result_type(array.dtype, np.uint)
else:
agg.dtype = array.dtype if dtype is None else dtype
(single_axis,) = axis_ # type: ignore[misc]
# avoid some roundoff error when we can.
if by_.shape[-1] == 1 or by_.shape == grp_shape:
return array.astype(agg.dtype)
# Made a design choice here to have `preprocess` handle both array and group_idx
# Example: for reversing, we need to reverse the whole array, not just reverse
# each block independently
inp = AlignedArrays(array=array, group_idx=by_)
if agg.preprocess:
inp = agg.preprocess(inp)
if not has_dask:
final_state = chunk_scan(inp, axis=single_axis, agg=agg, dtype=agg.dtype)
result = _finalize_scan(final_state)
else:
result = dask_groupby_scan(inp.array, inp.group_idx, axes=axis_, agg=agg)
# Made a design choice here to have `postprocess` handle both array and group_idx
out = AlignedArrays(array=result, group_idx=by_)
if agg.finalize:
out = agg.finalize(out)
return out.array
def chunk_scan(inp: AlignedArrays, *, axis: int, agg: Scan, dtype=None, keepdims=None) -> ScanState:
assert axis == inp.array.ndim - 1
# I don't think we need to re-factorize here unless we are grouping by a dask array
accumulated = generic_aggregate(
inp.group_idx,
inp.array,
axis=axis,
engine="flox",
func=agg.scan,
dtype=dtype,
fill_value=agg.identity,
)
result = AlignedArrays(array=accumulated, group_idx=inp.group_idx)
return ScanState(result=result, state=None)
def grouped_reduce(inp: AlignedArrays, *, agg: Scan, axis: int, keepdims=None) -> ScanState:
assert axis == inp.array.ndim - 1
reduced = chunk_reduce(
inp.array,
inp.group_idx,
func=(agg.reduction,),
axis=axis,
engine="flox",
dtype=inp.array.dtype,
fill_value=agg.identity,
expected_groups=None,
)
return ScanState(
state=AlignedArrays(array=reduced["intermediates"][0], group_idx=reduced["groups"]),
result=None,
)
def _zip(group_idx: np.ndarray, array: np.ndarray) -> AlignedArrays:
return AlignedArrays(group_idx=group_idx, array=array)
def _finalize_scan(block: ScanState) -> np.ndarray:
assert block.result is not None
return block.result.array
def dask_groupby_scan(array, by, axes: T_Axes, agg: Scan) -> DaskArray:
from dask.array import map_blocks
from dask.array.reductions import cumreduction as scan
from flox.aggregations import scan_binary_op
if len(axes) > 1:
raise NotImplementedError("Scans are only supported along a single axis.")
(axis,) = axes
array, by = _unify_chunks(array, by)
# 1. zip together group indices & array
zipped = map_blocks(
_zip,
by,
array,
dtype=array.dtype,
meta=array._meta,
name="groupby-scan-preprocess",
)
scan_ = partial(chunk_scan, agg=agg)
# dask tokenizing error workaround
scan_.__name__ = scan_.func.__name__ # type: ignore[attr-defined]
# 2. Run the scan
accumulated = scan(
func=scan_,
binop=partial(scan_binary_op, agg=agg),
ident=agg.identity,
x=zipped,
axis=axis,
# TODO: support method="sequential" here.
method="blelloch",
preop=partial(grouped_reduce, agg=agg),
dtype=agg.dtype,
)
# 3. Unzip and extract the final result array, discard groups
result = map_blocks(_finalize_scan, accumulated, dtype=agg.dtype)
assert result.chunks == array.chunks
return result