More climatology reductions

This one is motivated by this Pangeo Discourse post and follows this notebook

The task is to compute an hourly climatology from an hourly dataset with 744 hours in each chunk.

We choose the “map-reduce” strategy because:

  1. all hours (groups) are present in each chunk;

  2. a groupby reduction applied blockwise will result in arrays of shape (X, Y, 744) being reduced to (X, Y, 24) i.e. 744/24=31x decrease in chunk size, so this should work well memory wise.

import dask.array
import numpy as np
import pandas as pd
import xarray as xr
from dask.distributed import Client
from distributed import performance_report

import flox.xarray

# Setup a local cluster.
# By default this sets up 1 worker per core
client = Client(memory_limit="2 GiB", threads_per_worker=1, n_workers=4)
client.cluster
/Users/dcherian/mambaforge/envs/dcpy/lib/python3.8/site-packages/distributed/node.py:180: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 51613 instead
  warnings.warn(
%load_ext watermark



%watermark -iv
dask  : 2022.3.0
pandas: 1.3.5
numpy : 1.21.5
xarray: 0.20.3.dev137+g3f3a197c8

Create data

ds = xr.Dataset(
    {
        "tp": (
            ("time", "latitude", "longitude"),
            dask.array.ones((8760, 721, 1440), chunks=(744, 50, 1440), dtype=np.float32),
        )
    },
    coords={"time": pd.date_range("2021-01-01", "2021-12-31 23:59", freq="H")},
)
ds
<xarray.Dataset>
Dimensions:  (time: 8760, latitude: 721, longitude: 1440)
Coordinates:
  * time     (time) datetime64[ns] 2021-01-01 ... 2021-12-31T23:00:00
Dimensions without coordinates: latitude, longitude
Data variables:
    tp       (time, latitude, longitude) float32 dask.array<chunksize=(744, 50, 1440), meta=np.ndarray>

Here’s just plain xarray: 10000 tasks and one chunk per hour in the output

ds.tp.groupby("time.hour").mean()
<xarray.DataArray 'tp' (hour: 24, latitude: 721, longitude: 1440)>
dask.array<stack, shape=(24, 721, 1440), dtype=float32, chunksize=(1, 50, 1440), chunktype=numpy.ndarray>
Coordinates:
  * hour     (hour) int64 0 1 2 3 4 5 6 7 8 9 ... 14 15 16 17 18 19 20 21 22 23
Dimensions without coordinates: latitude, longitude

And flox: 600 tasks and all hours in a single chunk

hourly = flox.xarray.xarray_reduce(ds.tp, ds.time.dt.hour, func="mean")
hourly
<xarray.DataArray 'tp' (hour: 24, latitude: 721, longitude: 1440)>
dask.array<transpose, shape=(24, 721, 1440), dtype=float32, chunksize=(24, 50, 1440), chunktype=numpy.ndarray>
Coordinates:
  * hour     (hour) int64 0 1 2 3 4 5 6 7 8 9 ... 14 15 16 17 18 19 20 21 22 23
Dimensions without coordinates: latitude, longitude
with performance_report("hourly-climatology.html"):
    hourly.compute()

View the performance report here, and a video of the dask dashboard here