{ "cells": [ { "cell_type": "markdown", "id": "0", "metadata": {}, "source": [ "# Custom Aggregations\n", "\n", "This notebook is motivated by a\n", "[post](https://discourse.pangeo.io/t/using-xhistogram-to-bin-measurements-at-particular-stations/2365/4)\n", "on the Pangeo discourse forum.\n", "\n", "> Even better would be a command that lets me simply do the following.\n", ">\n", "> A = da.groupby(['lon_bins', 'lat_bins']).mode()\n", "\n", "This notebook will describe how to accomplish this using a custom `Aggregation`.\n", "\n", "\n", "```{tip}\n", "flox now supports `mode`, `nanmode`, `quantile`, `nanquantile`, `median`, `nanmedian` using exactly the same \n", "approach as shown below\n", "```\n" ] }, { "cell_type": "code", "execution_count": null, "id": "1", "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "import numpy_groupies as npg\n", "import xarray as xr\n", "\n", "import flox.xarray\n", "from flox import Aggregation\n", "from flox.aggregations import mean\n", "\n", "# define latitude and longitude bins\n", "binsize = 1.0 # 1°x1° bins\n", "lon_min, lon_max, lat_min, lat_max = [-180, 180, -65, 65]\n", "lon_bins = np.arange(lon_min, lon_max, binsize)\n", "lat_bins = np.arange(lat_min, lat_max, binsize)\n", "\n", "size = 28397\n", "\n", "\n", "da = xr.DataArray(\n", " np.random.randint(0, 7, size=size),\n", " dims=\"profile\",\n", " coords={\n", " \"lat\": (\n", " \"profile\",\n", " (np.random.random(size) - 0.5) * (lat_max - lat_min),\n", " ),\n", " \"lon\": (\n", " \"profile\",\n", " (np.random.random(size) - 0.5) * (lon_max - lon_min),\n", " ),\n", " },\n", " name=\"label\",\n", ")\n", "da" ] }, { "cell_type": "markdown", "id": "2", "metadata": {}, "source": [ "## A built-in reduction\n", "\n", "First a simple example of lat-lon binning using a built-in reduction: mean\n" ] }, { "cell_type": "code", "execution_count": null, "id": "3", "metadata": {}, "outputs": [], "source": [ "binned_mean = flox.xarray.xarray_reduce(\n", " da,\n", " da.lat,\n", " da.lon,\n", " func=\"mean\", # built-in\n", " expected_groups=(lat_bins, lon_bins),\n", " isbin=(True, True),\n", ")\n", "binned_mean.plot()" ] }, { "cell_type": "markdown", "id": "4", "metadata": {}, "source": [ "## Aggregations\n", "\n", "flox knows how to interperet `func=\"mean\"` because it's been implemented in\n", "`aggregations.py` as an\n", "[Aggregation](https://flox.readthedocs.io/en/latest/generated/flox.aggregations.Aggregation.html)\n", "\n", "An `Aggregation` is a blueprint for computing an aggregation, with both numpy\n", "and dask data.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "5", "metadata": {}, "outputs": [], "source": [ "print(type(mean))\n", "mean" ] }, { "cell_type": "markdown", "id": "6", "metadata": {}, "source": [ "Here's how the mean Aggregation is created\n", "\n", "```python\n", "mean = Aggregation(\n", " name=\"mean\",\n", "\n", " # strings in the following are built-in grouped reductions\n", " # implemented by the underlying \"engine\": flox or numpy_groupies or numbagg\n", "\n", " # for pure numpy inputs\n", " numpy=\"mean\",\n", "\n", " # The next are for dask inputs and describe how to reduce\n", " # the data in parallel\n", " chunk=(\"sum\", \"nanlen\"), # first compute these blockwise : (grouped_sum, grouped_count)\n", " combine=(\"sum\", \"sum\"), # reduce intermediate results (sum the sums, sum the counts)\n", " finalize=lambda sum_, count: sum_ / count, # final mean value (divide sum by count)\n", "\n", " fill_value=(0, 0), # fill value for intermediate sums and counts when groups have no members\n", " dtypes=(None, np.intp), # optional dtypes for intermediates\n", " final_dtype=np.floating, # final dtype for output\n", ")\n", "```\n" ] }, { "cell_type": "markdown", "id": "7", "metadata": {}, "source": [ "## Defining a custom aggregation\n", "\n", "First we'll need a function that executes the grouped reduction given numpy\n", "inputs.\n", "\n", "Custom functions are required to have this signature (copied form\n", "numpy_groupies):\n", "\n", "```python\n", "\n", "def custom_grouped_reduction(\n", " group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None\n", "):\n", " \"\"\"\n", " Parameters\n", " ----------\n", "\n", " group_idx : np.ndarray, 1D\n", " integer codes for group labels (1D)\n", " array : np.ndarray, nD\n", " values to reduce (nD)\n", " axis : int\n", " axis of array along which to reduce. Requires array.shape[axis] == len(group_idx)\n", " size : int, optional\n", " expected number of groups. If none, output.shape[-1] == number of uniques in group_idx\n", " fill_value : optional\n", " fill_value for when number groups in group_idx is less than size\n", " dtype : optional\n", " dtype of output\n", "\n", " Returns\n", " -------\n", "\n", " np.ndarray with array.shape[-1] == size, containing a single value per group\n", " \"\"\"\n", " pass\n", "```\n", "\n", "Since numpy_groupies does not implement a median, we'll do it ourselves by\n", "passing `np.median` to `numpy_groupies.aggregate_numpy.aggregate`. This will\n", "loop over all groups, and then execute `np.median` on the group members in\n", "serial. It is not fast, but quite convenient.\n" ] }, { "cell_type": "code", "execution_count": null, "id": "8", "metadata": {}, "outputs": [], "source": [ "def grouped_median(group_idx, array, *, axis=-1, size=None, fill_value=None, dtype=None):\n", " return npg.aggregate_numpy.aggregate(\n", " group_idx,\n", " array,\n", " func=np.median,\n", " axis=axis,\n", " size=size,\n", " fill_value=fill_value,\n", " dtype=dtype,\n", " )" ] }, { "cell_type": "markdown", "id": "9", "metadata": {}, "source": [ "Now we create the `Aggregation`\n" ] }, { "cell_type": "code", "execution_count": null, "id": "10", "metadata": {}, "outputs": [], "source": [ "agg_median = Aggregation(\n", " name=\"median\",\n", " numpy=grouped_median,\n", " fill_value=-1,\n", " chunk=None,\n", " combine=None,\n", ")\n", "agg_median" ] }, { "cell_type": "markdown", "id": "11", "metadata": {}, "source": [ "And apply it!\n" ] }, { "cell_type": "code", "execution_count": null, "id": "12", "metadata": {}, "outputs": [], "source": [ "flox.xarray.xarray_reduce(\n", " da,\n", " da.lat,\n", " da.lon,\n", " func=agg_median,\n", " expected_groups=(lat_bins, lon_bins),\n", " isbin=(True, True),\n", " fill_value=np.nan,\n", ")" ] } ], "metadata": { "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3" } }, "nbformat": 4, "nbformat_minor": 5 }